clean up mqtt stuff

fix workflow
This commit is contained in:
lumapu 2022-11-12 02:29:00 +01:00
parent 65762a6e17
commit 24de3ac39b
8 changed files with 140 additions and 138 deletions

View file

@ -18,6 +18,8 @@
#endif
#include "../utils/dbg.h"
#include "../utils/ahoyTimer.h"
#include "../config/config.h"
#include <PubSubClient.h>
#include "../defines.h"
#include "../hm/hmSystem.h"
@ -38,18 +40,34 @@ class mqtt {
~mqtt() { }
void setup(mqttConfig_t *cfg, const char *devname, HMSYSTEM *sys) {
void setup(mqttConfig_t *cfg, const char *devname, const char *version, HMSYSTEM *sys, uint32_t *utcTs) {
DPRINTLN(DBG_VERBOSE, F("mqtt.h:setup"));
mAddressSet = true;
mCfg = cfg;
snprintf(mDevName, DEVNAME_LEN, "%s", devname);
mSys = sys;
snprintf(mDevName, DEVNAME_LEN, "%s", devname);
mCfg = cfg;
mSys = sys;
mUtcTimestamp = utcTs;
mClient->setServer(mCfg->broker, mCfg->port);
mClient->setBufferSize(MQTT_MAX_PACKET_SIZE);
setCallback(std::bind(&mqtt<HMSYSTEM>::cbMqtt, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
sendMsg("version", version);
sendMsg("device", devname);
sendMsg("uptime", "0");
}
void loop() {
if(mAddressSet) {
if(!mClient->connected())
reconnect();
mClient->loop();
if(!mSendList.empty())
sendIvData();
}
}
void setCallback(MQTT_CALLBACK_SIGNATURE) {
@ -84,20 +102,15 @@ class mqtt {
return mClient->connected();
}
void loop() {
//DPRINT(F("m"));
if(mAddressSet) {
if(!mClient->connected())
reconnect();
mClient->loop();
}
void payloadEventListener(uint8_t cmd) {
mSendList.push(cmd);
}
uint32_t getTxCnt(void) {
return mTxCnt;
}
void sendMqttDiscoveryConfig(const char *topic, uint32_t invertval) {
void sendMqttDiscoveryConfig(const char *topic) {
DPRINTLN(DBG_VERBOSE, F("sendMqttDiscoveryConfig"));
char stateTopic[64], discoveryTopic[64], buffer[512], name[32], uniq_id[32];
@ -131,7 +144,7 @@ class mqtt {
doc["unit_of_meas"] = iv->getUnit(i, rec);
doc["uniq_id"] = String(iv->serial.u64, HEX) + "_" + uniq_id;
doc["dev"] = deviceObj;
doc["exp_aft"] = invertval + 5; // add 5 sec if connection is bad or ESP too slow @TODO: stimmt das wirklich als expire!?
doc["exp_aft"] = MQTT_INTERVAL + 5; // add 5 sec if connection is bad or ESP too slow @TODO: stimmt das wirklich als expire!?
if (devCls != NULL)
doc["dev_cla"] = devCls;
if (stateCls != NULL)
@ -148,7 +161,62 @@ class mqtt {
}
}
void sendIvData(uint32_t mUtcTs, std::queue<uint8_t> list) {
private:
void reconnect(void) {
DPRINTLN(DBG_DEBUG, F("mqtt.h:reconnect"));
DPRINTLN(DBG_DEBUG, F("MQTT mClient->_state ") + String(mClient->state()) );
#ifdef ESP8266
DPRINTLN(DBG_DEBUG, F("WIFI mEspClient.status ") + String(mEspClient.status()) );
#endif
boolean resub = false;
if(!mClient->connected() && (millis() - mLastReconnect) > MQTT_RECONNECT_DELAY ) {
mLastReconnect = millis();
if(strlen(mDevName) > 0) {
// der Server und der Port müssen neu gesetzt werden,
// da ein MQTT_CONNECTION_LOST -3 die Werte zerstört hat.
mClient->setServer(mCfg->broker, mCfg->port);
mClient->setBufferSize(MQTT_MAX_PACKET_SIZE);
char lwt[MQTT_TOPIC_LEN + 7 ]; // "/uptime" --> + 7 byte
snprintf(lwt, MQTT_TOPIC_LEN + 7, "%s/uptime", mCfg->topic);
if((strlen(mCfg->user) > 0) && (strlen(mCfg->pwd) > 0))
resub = mClient->connect(mDevName, mCfg->user, mCfg->pwd, lwt, 0, false, "offline");
else
resub = mClient->connect(mDevName, lwt, 0, false, "offline");
// ein Subscribe ist nur nach einem connect notwendig
if(resub) {
char topic[MQTT_TOPIC_LEN + 13 ]; // "/devcontrol/#" --> + 6 byte
// ToDo: "/devcontrol/#" is hardcoded
snprintf(topic, MQTT_TOPIC_LEN + 13, "%s/devcontrol/#", mCfg->topic);
DPRINTLN(DBG_INFO, F("subscribe to ") + String(topic));
mClient->subscribe(topic); // subscribe to mTopic + "/devcontrol/#"
}
}
}
}
const char *getFieldDeviceClass(uint8_t fieldId) {
uint8_t pos = 0;
for (; pos < DEVICE_CLS_ASSIGN_LIST_LEN; pos++) {
if (deviceFieldAssignment[pos].fieldId == fieldId)
break;
}
return (pos >= DEVICE_CLS_ASSIGN_LIST_LEN) ? NULL : deviceClasses[deviceFieldAssignment[pos].deviceClsId];
}
const char *getFieldStateClass(uint8_t fieldId) {
uint8_t pos = 0;
for (; pos < DEVICE_CLS_ASSIGN_LIST_LEN; pos++) {
if (deviceFieldAssignment[pos].fieldId == fieldId)
break;
}
return (pos >= DEVICE_CLS_ASSIGN_LIST_LEN) ? NULL : stateClasses[deviceFieldAssignment[pos].stateClsId];
}
void sendIvData() {
isConnected(true); // really needed? See comment from HorstG-57 #176
char topic[32 + MAX_NAME_LENGTH], val[32];
float total[4];
@ -158,26 +226,26 @@ class mqtt {
sendMsg("uptime", val);
if(list.empty())
if(mSendList.empty())
return;
while(!list.empty()) {
while(!mSendList.empty()) {
memset(total, 0, sizeof(float) * 4);
for (uint8_t id = 0; id < mSys->getNumInverters(); id++) {
Inverter<> *iv = mSys->getInverterByPos(id);
if (NULL == iv)
continue; // skip to next inverter
record_t<> *rec = iv->getRecordStruct(list.front());
record_t<> *rec = iv->getRecordStruct(mSendList.front());
if(list.front() == RealTimeRunData_Debug) {
if(mSendList.front() == RealTimeRunData_Debug) {
// inverter status
uint8_t status = MQTT_STATUS_AVAIL_PROD;
if (!iv->isAvailable(mUtcTs, rec)) {
if (!iv->isAvailable(*mUtcTimestamp, rec)) {
status = MQTT_STATUS_NOT_AVAIL_NOT_PROD;
totalIncomplete = true;
}
else if (!iv->isProducing(mUtcTs, rec)) {
else if (!iv->isProducing(*mUtcTimestamp, rec)) {
if (MQTT_STATUS_AVAIL_PROD == status)
status = MQTT_STATUS_AVAIL_NOT_PROD;
}
@ -200,14 +268,14 @@ class mqtt {
}
// data
if(iv->isAvailable(mUtcTs, rec)) {
if(iv->isAvailable(*mUtcTimestamp, rec)) {
for (uint8_t i = 0; i < rec->length; i++) {
snprintf(topic, 32 + MAX_NAME_LENGTH, "%s/ch%d/%s", iv->name, rec->assign[i].ch, fields[rec->assign[i].fieldId]);
snprintf(val, 10, "%.3f", iv->getValue(i, rec));
sendMsg(topic, val);
// calculate total values for RealTimeRunData_Debug
if (list.front() == RealTimeRunData_Debug) {
if (mSendList.front() == RealTimeRunData_Debug) {
if (CH0 == rec->assign[i].ch) {
switch (rec->assign[i].fieldId) {
case FLD_PAC:
@ -231,7 +299,7 @@ class mqtt {
}
}
list.pop(); // remove from list once all inverters were processed
mSendList.pop(); // remove from list once all inverters were processed
if ((true == sendTotal) && (false == totalIncomplete)) {
uint8_t fieldId;
@ -259,61 +327,6 @@ class mqtt {
}
}
private:
void reconnect(void) {
DPRINTLN(DBG_DEBUG, F("mqtt.h:reconnect"));
DPRINTLN(DBG_DEBUG, F("MQTT mClient->_state ") + String(mClient->state()) );
#ifdef ESP8266
DPRINTLN(DBG_DEBUG, F("WIFI mEspClient.status ") + String(mEspClient.status()) );
#endif
boolean resub = false;
if(!mClient->connected() && (millis() - mLastReconnect) > MQTT_RECONNECT_DELAY ) {
mLastReconnect = millis();
if(strlen(mDevName) > 0) {
// der Server und der Port müssen neu gesetzt werden,
// da ein MQTT_CONNECTION_LOST -3 die Werte zerstört hat.
mClient->setServer(mCfg->broker, mCfg->port);
mClient->setBufferSize(MQTT_MAX_PACKET_SIZE);
char lwt[MQTT_TOPIC_LEN + 7 ]; // "/uptime" --> + 7 byte
snprintf(lwt, MQTT_TOPIC_LEN + 7, "%s/uptime", mCfg->topic);
if((strlen(mCfg->user) > 0) && (strlen(mCfg->pwd) > 0))
resub = mClient->connect(mDevName, mCfg->user, mCfg->pwd, lwt, 0, false, "offline");
else
resub = mClient->connect(mDevName, lwt, 0, false, "offline");
// ein Subscribe ist nur nach einem connect notwendig
if(resub) {
char topic[MQTT_TOPIC_LEN + 13 ]; // "/devcontrol/#" --> + 6 byte
// ToDo: "/devcontrol/#" is hardcoded
snprintf(topic, MQTT_TOPIC_LEN + 13, "%s/devcontrol/#", mCfg->topic);
DPRINTLN(DBG_INFO, F("subscribe to ") + String(topic));
mClient->subscribe(topic); // subscribe to mTopic + "/devcontrol/#"
}
}
}
}
const char *getFieldDeviceClass(uint8_t fieldId) {
uint8_t pos = 0;
for (; pos < DEVICE_CLS_ASSIGN_LIST_LEN; pos++) {
if (deviceFieldAssignment[pos].fieldId == fieldId)
break;
}
return (pos >= DEVICE_CLS_ASSIGN_LIST_LEN) ? NULL : deviceClasses[deviceFieldAssignment[pos].deviceClsId];
}
const char *getFieldStateClass(uint8_t fieldId) {
uint8_t pos = 0;
for (; pos < DEVICE_CLS_ASSIGN_LIST_LEN; pos++) {
if (deviceFieldAssignment[pos].fieldId == fieldId)
break;
}
return (pos >= DEVICE_CLS_ASSIGN_LIST_LEN) ? NULL : stateClasses[deviceFieldAssignment[pos].stateClsId];
}
void cbMqtt(char *topic, byte *payload, unsigned int length) {
// callback handling on subscribed devcontrol topic
DPRINTLN(DBG_INFO, F("cbMqtt"));
@ -420,12 +433,14 @@ class mqtt {
WiFiClient mEspClient;
PubSubClient *mClient;
HMSYSTEM *mSys;
uint32_t *mUtcTimestamp;
bool mAddressSet;
mqttConfig_t *mCfg;
char mDevName[DEVNAME_LEN];
uint32_t mLastReconnect;
uint32_t mTxCnt;
std::queue<uint8_t> mSendList;
};
#endif /*__MQTT_H_*/