Wie Marius vor wenigen Tagen beschrieben hat, sind bei seinen ersten Fahrten mit Smart Drive keine bzw. nicht alle Messdaten in der Datenbank und somit im Frontend der Kunden angekommen. Nach der Auswertung des Log des Device in der Balena Cloud können wir davon ausgehen, dass die Messwerte durch den Pi auch während der Fahrt über den mobilen Internetzugang als MQTT-Messages verschickt worden sind. Daher muss der Fehler im Backend gesucht werden.

Das Ansehen der Konsolenausgabe eines als Daemon laufenden Docker Containers geschieht mit dem Befehl

docker logs iot-receiver -f


Dabei ist iot-receiver der Name eines Docker Containers und f steht für follow, es werden also nicht nur vergangene Ausgaben angezeigt, sondern auch alle neu ausgegebenen Zeilen bis man mit Strg+C beendet.

Problem 1

Bei Durchsicht der Ausgabe unseres MQTT-Receivers wird deutlich, dass zwar alle Nachrichten durch den Receiver empfangen worden sind, jedoch nicht in die Datenbank geschrieben werden konnten. Eine entsprechende Warnung wurde ab dem Zeitpunkt des Fahrtbeginns für jede Nachricht ausgegeben. Bei den bisherigen Tests wurde alle 2 Sekunden eine Messung durchgeführt und diese über eine stabile Internetverbindung versendet. Für den Einsatz im Auto wurde die Anzahl der Messungen auf 4 Messungen pro Sekunde erhöht. Außerdem wurde die weniger stabile Internetverbindung eines Smartphones benutzt.

Zunächst wurden die Auswirkungen des höheren Messintervalls geprüft. Da das Intervall einige Minuten vor Fahrtbegin umgestellt wurde und zunächst alles weiterhin funktioniert hat, können wir dies als Problemursache eher ausschließen. Zum weiteren Test wurde das Intervall jedoch noch einmal deutlich verkürzt, nämlich auf 0,01 Sekunden, also 100 Messvorgänge pro Sekunde. Dabei ist zu beobachten, dass der Pi nicht annähernd soviele Messungen pro Sekunde schafft, jedoch ca 30 MQTT-Messages pro Sekunde übertragen werden und die Werte ohne Probleme in die Datenbank geschrieben werden können.

Als zweite mögliche Ursache des Problems kommt die instabile Internetverbindung in Betracht. Zwar werden auf dem Pi die Messwerte zwischengespeichert und dann bei Neuaufbau der Verbindung versandt, jedoch passiert an dieser Stelle ein entscheidender Fehler. Bisher wurde angenommen, dass das Attribut _id den Zeitpunkt der Messung als Zeitstempel beinhaltet. Diese Annahme ist leider falsch und führt zu einem fatalen Fehler. Der Zeitstempel gibt den Zeitpunkt des Versands der Nachricht an. Konnten nun einige Nachrichten nicht versandt werden und müssen in einem Block als Liste versandt werden, berechnet der MQTT-Receiver für alle Nachrichten einer Liste den identischen Zeitstempel und verwendet diesen als Teil des Primärschlüssels einer Zeile in der Datenbank. Da der Primärschlüssel aus dem Zeitstempel und der Device ID besteht, wird dann versucht mehrere Zeilen mit identischem Primärschlüssel in die Datenbank zu schreiben.

Lösung

Zur Lösung des Problems wurde im Service sensor auf dem Device ein Zeitstempel zu jedem Messwert hinzugefügt. Dazu wird die Methode time() des gleichnamigen Python-Moduls verwendet. Diese gibt die aktuelle Zeit als UNIX-Timestamp mit sechs Stellen nach dem Komma als float-Wert zurück. Danach wird der Messwert in das zu versendende Dictionary aufgenommen und bekommt den Schlüssel Timestamp. Nun kann dieses Attribut im MQTT-Receiver statt _id verwendet werden. Da der Wert bereits mit Nachkommastellen übermittelt wird, ist keine Division mehr notwendig.

Mit großer Hoffnung wurde nun eine weitere Testfahrt durchgeführt, die leider ein weiteres Problem aufzeigte.

Problem 2

Bei der zweiten Fahrt sind nicht alle Messwerte verloren gegangen. Ab Fahrtbeginn konnten die Messwerte jedoch zunächst wieder nicht in die Datenbank geschrieben werden. Nach einem Neustart des MQTT-Receivers trat der Fehler jedoch nicht mehr auf. Die Fehlerursache war diesmal nicht so leicht zu finden, da es nicht gelungen ist, das Problem nachzustellen. Daher die folgenden Gedanken zur möglichen Ursache des Problems:

  • Im MQTT-Receiver wird die Methode loop_forever() verwendet, der Empfang von Nachrichten also im Main-Thread durchgeführt.
  • Die Callback-Methode, die für jede empfangene Nachricht aufgerufen wird läuft somit im selben Thread
  • Die Callback-Methode ist teuer, da zunächst die empfangene Zeichenkette als JSON geparst wird, anschließend über die empfangene Liste iteriert wird, für jeden Messwert 7 Typecasts der numerischen Werte durchgeführt werden und jeder Messwert einzeln in die Datenbank geschrieben wird.
  • Bei instabiler Internetanbindung des Device und 4 Messwerten pro Sekunde sammeln sich in einer MQTT-Nachricht bereits bei kurzem Verbindungsausfall viele Messwerte.
  • Bei erneutem Verbindungsaufbau wird dann eine Liste mit vielen Messwerten versandt und sehr schnell danach weitere Messwerte.

Ich vermute, dass es durch die Ausführung in einem einzigen Thread zu einer Überschneidung zwischen der noch laufenden Verarbeitung einer MQTT-Message mit mehreren Messwerten und dem Empfang der darauffolgen MQTT-Message kommt. Dadurch könnte ein SQL-Statement nicht korrekt ausgeführt werden und somit der Cursor der Datenbankverbindung unbrauchbar werden, sodass keine folgenden Werte mehr in die Datenbank geschrieben werden können. Hierzu ist zu beachten, dass PostgreSQL nach jedem SQL-Statement ein Semikolon erwartet. Wird dieses nicht gesetzt, wird das Statement als unvollständig interpretiert und das nachfolgende Statement angehängt.

Lösung

Zur Lösung des Problems bei der vermuteten Ursache, wird der Empfang von MQTT-Messages in einen eigenen Thread ausgelagert. Wichtig dabei ist, dass die Callback-Methode so umgeschrieben wird, dass die Verarbeitung der Nachrichten im Main-Thread verbleibt und der neue Thread somit nicht blockiert.

Dazu wird die Methode loop_start() des MQTT-Clients verwendet, da diese automatisch einen eigenen Thread verwendet und den Main-Thread nicht blockiert. Somit ist es möglich den Main-Thread zur Kommunikation mit der Datenbank einzusetzen. Dazu ist jedoch eine Übermittlung der empfangenenen MQTT-Messages zwischen beiden Threads notwendig. Als Datenstruktur bietet sich dafür eine Warteschlange, di nach dem FIFO-Prinzip funktioniert, an. Eine solche Queue ist im Python Modul Queue enthalten. Für die Verwendung wichtig ist das Anstellen eines Objekts an hinterer Position sowie das Herausnehmen eines Objekts von der vorderen Position. Dazu stehen die Methoden put() und get() zur Verfügung. Dabei ist zu beachten, dass beide Methoden solange blockieren, bis die Aktion tatsächlich ausgeführt werden konnte. Dieser Effekt ist beim Herausnehmen eines Objekts erwünscht, damit das Programm immer solange wartet bis tatsächlich ein Objekt in der Warteschlange vorhanden ist. Beim Anstellen eines Objekts sollte der Effekt theoretisch nicht auftreten, da die Länge der Warteschlange nicht begrenzt wird, damit dies auch in der Praxis nicht vorkommen kann, wird statt der Methode put() die Methode put_nowait verwendet. Diese blockiert den Thread nicht, sondern wirft sofort einen Fehler, wenn kein Element an die Warteschlange angehängt werden kann.

Die vom MQTT-Client aufgerufene Callback-Methode beim Empfang einer Nachricht stellt diese nur noch an die Warteschlange an und übernimmt keine weitere Verarbeitung mehr. Damit wird der Thread zum Empfang der Nachrichten nicht blockiert, auch wenn die Verarbeitung länger dauert. In diesem Fall sammeln sich dann Elemente in der Queue und werden geordnet nacheinander verarbeitet. Im Main-Thread wird in einer Endlosschleife auf Elemente in der Queue gewartet. Ist kein Element in der Queue angestellt, wird der Thread durch die Methode get() der Queue blockiert. Ist ein Element vorhanden, wird dieses verarbeitet und erst nach Abschluss der Verarbeitung ein neues aus der Queue genommen. Dieses Verhalten verdeutlicht der folgende Code-Ausschnitt

while True:
    self._log.info(f"Queued Messages: {self._message_queue.qsize()}")
    messages_json = self._message_queue.get()
    message_list = json.loads(messages_json)
    for m in message_list:
        self._write_message_to_db(
            float(m['Timestamp']),
            m['_device_uuid'],
            float(m['X_acceleration']),
            float(m['Y_acceleration']),
            float(m['Z_acceleration']),
            int(m['X_rotation']),
            int(m['Y_rotation']),
            int(m['Z_rotation'])

Um die weitere Analyse zu vereinfachen, sollte der Fehler erneut auftreten, wurden außerdem die Ausgaben der except-Blöcke aussagekräftiger gestaltet. Statt einer selbst definierten Ausgabe können wie im folgenden Beispiel dargestellt zusätzlich Informationen zur aufgetretenen Exception als Log ausgegeben werden, sodass die Fehlerursache besser eingrenzbar ist.

try:
    pass # Hier Code, der Exception wirft
except Exception as e:
    print("Fehler aufgetreten"
    print(e))

Mit print(e) werden die Informationen ausgegeben, die bei nicht abgefangenen Fehlern in der Konsole erscheinen. Hier kann aber trotzdem auf das Problem reagiert und die Ausführung des Programms fortgesetzt werden.

Als weitere Sicherheit wurde im except-Block der Datenbankkommunikation eingeführt, dass der Cursor geschlossen wird und anschlißend ein neues Cursor-Objekt instanziiert wird. Dadurch wird ein Cursor, der einmal einen Fehler produziert hat, nicht weiter verwendet und kann somit als Fehlerursache für folgende Datenbankkommunikation ausgeschlossen werden.

Datenverlust beim Feldversuch – Suche nach dem Problem

Schreibe einen Kommentar