en / de
Expertisen
Methoden
Dienstleistungen
Referenzen
Jobs & Karriere
Firma
Technologie-Trends TechCast WebCast TechBlog News Events Academy

Internet of Things Applications with MQTT and nanopb Protocol Buffers

In this post we illustrate how to use the nanopb implementation of Google’s Protocol Buffers in combination with the MQTT-based data transmission on a small embedded platform. For simplicity an Arduino Ethernet Rev. 3 has been chosen.

As described earlier communication over the Hypertext Transfer Protocol (HTTP) can be a serious bottleneck for microcontrollers. A considerable alternative is the MQ Telemetry Transport (MQTT), which reduces the overhead dramatically and is more suited for IoT-applications. But MQTT is only a transport protocol and does not define how the data is presented. Therefore one has to define a content protocol upon MQTT that is machine-readable, reliable and small in size.

Google’s Protocol Buffers enable you to encode any information in a format that it is exchangable over platforms and implementation languages, making it therefore suitable for Internet of Things applications.

Message definition

In this example we want to create a sensor network that collects climate data such as temperature, humidity and air pressure from multiple sensors. Therefore the message prototype is defined as follows. It contains some general information about the sensor and multiple measurements. It can contain data for all three measured variables, but also only for a subset. Each mesurement contains a unix timestamp.

package huygens;
option java_package = "com.noser.edays.huygens";
 
message HuygensMessage {
     
    required int32 messageId = 1;
    required int32 sensorId = 2;
    required int32 timestamp = 3;
     
    message SensorValue {
        required int32 timestamp = 1;
        optional float temperature = 2;
        optional float humidity = 3;
        optional float pressure = 4;
    } 

    enum SensorState {
        ON = 1;
        OFF = 2;
        ERROR = 3;
    }

    repeated SensorValue sensorValues = 4;

    required SensorState sensorState = 5;
}

After the message definition you need to compile the proto definitions, so that you get C defintion files that you can use in your microcontroller (huygens.pb.h and huygens.pb.c)

user@host:~$ protoc -ohuygens.pb huygens.proto
user@host:~$ python nanopb/generator/nanopb_generator.py huygens.pb

Globals

The following headers, definitions and global variables are used. For the measurements, a global array for a predefined (MAX_MEASUREMENTS) set of SensorValue submessages is used and filled at each measurement. The mesurement shall be executed all two seconds (T_READ). For the MQTT communication the PubSubClient by Nick O’Leary is used. As this application only sends messages the callback function to handle incoming MQTT messages is left empty.

#include <Arduino.h>
#include <pins_arduino.h>
#include "Ethernet.h"
#include "huygens.pb.h"
#include "PubSubClient.h"
#include "pb_encode.h"

#define MSG_SIZE 128
#define MAX_MEASUREMENTS 5
#define HUYGENS_TOPIC "Huygens"
#define MQTT_MAX_PACKET_SIZE 512;
#define T_READ 2
#define SENSOR_ID 42

_huygens_HuygensMessage_SensorValue vals[MAX_MEASUREMENTS];
uint8_t cursor = 0;
uint32_t msgId = 1;

byte _mac[] = {0x90,0xA2,0xDA, 0x0D,0x8E,0xC0};
byte _ip[] = {192,168,1,121};

byte server[] = { 192, 168, 1, 100 };
EthernetClient ethClient;

uint32_t timeOffset;

void mqtt_callback(char* topic, byte* payload, unsigned int length){
	return;
}

PubSubClient client(server, 1883, mqtt_callback, ethClient);

Measuring the Environment

At each interrupt call the values are read from the sensors using the helper functions getHumidity(), getTemperature() and getPressure(), i.e. wrappers for AnalogRead() with some scaling, and stored in the next array element of the global measurement buffer vals[]. The timestamp is calculated using the helper function getTimestamp(), which is basically a wrapper for millis() and adds the offset to get a correct unix timestamp.

After the measurement the cursor is shifted for the next measurement. If the buffer is full, the message is sent using sendMessage() and the cursor is reset to the first element.

ISR(TIMER1_COMPA_vect){
    Serial.println(F("Reading Sensors"));

    vals[cursor].timestamp = getTimestamp();

    float h = getHumidity(); 
    float t = getTemperature(); 
    float p = getPressure();

    if (!isnan(t)) {
    	vals[cursor].has_temperature = true;
    	vals[cursor].temperature = t;
    }

    if (!isnan(h)) {
    	vals[cursor].has_humidity = true;
    	vals[cursor].humidity = h;
    }

    if (!isnan(p)) {
    	vals[cursor].has_pressure = true;
    	vals[cursor].pressure = p;
    }
    if (cursor < MAX_MEASUREMENTS - 1) {
	    ++cursor;
    } else {
	    cursor = 0;
	    sendMessage();
    }
}

Sending a Message

When a message is sent first a message struct msg as well as the the output stream (i.e. the buffer array sMsg) are initialised. Then the header fields are filled and the message is encoded. Afterwards the message is published to the MQTT broker. If that suceeded the measurement buffer is reset and the message ID incremented for the following message.

void sendMessage() {
	Serial.println(F("Sending Message"));

	_huygens_HuygensMessage msg;
	uint8_t sMsg[MSG_SIZE];
	pb_ostream_t buffer = pb_ostream_from_buffer(sMsg, sizeof(sMsg));

	msg.timestamp = getTimestamp();
	msg.sensorId = SENSOR_ID;
	msg.messageId = msgId;

	msg.sensorState = huygens_HuygensMessage_SensorState_ON;

	//Map the callback function to encode the submessages for sensorValues
	msg.sensorValues.funcs.encode = &sensorValues_callback;
	msg.sensorValues.arg = NULL;

	//Encode the message
	if (!pb_encode(&buffer, huygens_HuygensMessage_fields, &msg)) {
		Serial.println(F("Encoding failed"));
		Serial.println(PB_GET_ERROR(&buffer));
	}

	//Publish everything to the MQTT broker
	if (!client.publish(HUYGENS_TOPIC, sMsg, buffer.bytes_written)) {
		Serial.println(F("Publishing Failed"));
	} else {
		resetValues();
		++msgId;
	}
}

As seen, to encode the submessages a callback function is required. In our case, the measurement buffer is iterated and each measurement is encoded to the sending buffer of the parent message.

bool sensorValues_callback(pb_ostream_t *stream, 
						const pb_field_t *field, 
						void * const *arg) {
	for (int i=0; i<MAX_MEASUREMENTS; ++i) {
		if (vals[i].timestamp != 0) {
		    if (!pb_encode_tag_for_field(stream, field)) return false;
		    if (!pb_encode_submessage(stream, 
		                   huygens_HuygensMessage_SensorValue_fields, 
		                   &vals[i])) 
		        return false;
		}
	}
	return true;
}

To reset the measurement buffer after sending each field is set to zero. Also, the cursor is reset to the first element.

void resetValues() {
    cursor = 0;
	for (int i=0;i<MAX_MEASUREMENTS;i++) {
		vals[i].timestamp = 0;
		vals[i].has_humidity = false;
		vals[i].has_pressure = false;
		vals[i].has_temperature = false;
		vals[i].humidity = 0;
		vals[i].pressure = 0;
		vals[i].temperature = 0;
	}
}

Setup and Loop

The setup function is quite simple and initialises all the required stuff. As the messages require Unix timestamp, the current time is fetched from an NTP server using ntpUnixTime() as described on the Arduino Playground and stored in the timeOffset. secs() is a small helper function that measures the elapsed time in seconds since startup, i.e. millis()/1000.

void setup() {
	Serial.begin(9600);
	Serial.println(F("Starting Ethernet"));

	Ethernet.begin(_mac, _ip);

	Serial.println(F("Connecting to MQTT Broker"));
	if (!client.connect("HuygensArduino")){
		Serial.println(F("Connection failed"));
	}

	Serial.println(F("Getting NTP time Offset:"));

	EthernetUDP udp;
	timeOffset = ntpUnixTime(udp) - secs();

	Serial.println(timeOffset);

	Serial.println(F("Setting up Clock Interrupt"));

	// Clock Interrupt setup
	cli();
	TCCR1A = 0;// set entire TCCR1A register to 0
	TCCR1B = 0;// same for TCCR1B
	TCNT1  = 0;//initialize counter value to 0
	// set compare match register for 1hz increments
	OCR1A =  16000000 * T_READ / 1024 - 1;// (must be <65536)
	// turn on CTC mode
	TCCR1B |= (1 << WGM12);
	// Set CS12 and CS10 bits for 1024 prescaler
	TCCR1B |= (1 << CS12) | (1 << CS10);
	// enable timer compare interrupt
	TIMSK1 |= (1 << OCIE1A);
	sei();

	Serial.println(F("Arduino Ready"));

}

void loop() {

}

As all the stuff is triggered by the clock interrupt, the loop function remains empty.

Conclusion

In this example, we implemented a sensor that regularly publishes messages to a MQTT topic. The overall program size is less than 20kB and the RAM usage is less than 1200 Bytes. This leaves enough capacity to increase the functionality if required.

Kommentare

Eine Antwort zu “Internet of Things Applications with MQTT and nanopb Protocol Buffers”

  1. Carlos Pino sagt:

    Hi Stefan,

    Great article. was very helpfull.
    I am trying to use nanopb in arduino to generate the protobuf payload to send data to Kapua. Forsome reason I am not getting the expected payload. Would you please help me with this?

    Regards,
    Carlos

Schreiben Sie einen Kommentar

Ihre E-Mail-Adresse wird nicht veröffentlicht. Erforderliche Felder sind mit * markiert

Newsletter - aktuelle Angebote, exklusive Tipps und spannende Neuigkeiten

 Jetzt anmelden
NACH OBEN
Zur Webcast Übersicht