In this post we illustrate how to use the nanopb implementation of Google’s Protocol Buffers in combination with the MQTT-based data transmission on a small embedded platform. For simplicity an Arduino Ethernet Rev. 3 has been chosen.
As described earlier communication over the Hypertext Transfer Protocol (HTTP) can be a serious bottleneck for microcontrollers. A considerable alternative is the MQ Telemetry Transport (MQTT), which reduces the overhead dramatically and is more suited for IoT-applications. But MQTT is only a transport protocol and does not define how the data is presented. Therefore one has to define a content protocol upon MQTT that is machine-readable, reliable and small in size.
Google’s Protocol Buffers enable you to encode any information in a format that it is exchangable over platforms and implementation languages, making it therefore suitable for Internet of Things applications.
In this example we want to create a sensor network that collects climate data such as temperature, humidity and air pressure from multiple sensors. Therefore the message prototype is defined as follows. It contains some general information about the sensor and multiple measurements. It can contain data for all three measured variables, but also only for a subset. Each mesurement contains a unix timestamp.
package huygens; option java_package = "com.noser.edays.huygens"; message HuygensMessage { required int32 messageId = 1; required int32 sensorId = 2; required int32 timestamp = 3; message SensorValue { required int32 timestamp = 1; optional float temperature = 2; optional float humidity = 3; optional float pressure = 4; } enum SensorState { ON = 1; OFF = 2; ERROR = 3; } repeated SensorValue sensorValues = 4; required SensorState sensorState = 5; }
After the message definition you need to compile the proto definitions, so that you get C defintion files that you can use in your microcontroller (huygens.pb.h and huygens.pb.c)
user@host:~$ protoc -ohuygens.pb huygens.proto user@host:~$ python nanopb/generator/nanopb_generator.py huygens.pb
The following headers, definitions and global variables are used. For the measurements, a global array for a predefined (MAX_MEASUREMENTS) set of SensorValue submessages is used and filled at each measurement. The mesurement shall be executed all two seconds (T_READ). For the MQTT communication the PubSubClient by Nick O’Leary is used. As this application only sends messages the callback function to handle incoming MQTT messages is left empty.
#include <Arduino.h> #include <pins_arduino.h> #include "Ethernet.h" #include "huygens.pb.h" #include "PubSubClient.h" #include "pb_encode.h" #define MSG_SIZE 128 #define MAX_MEASUREMENTS 5 #define HUYGENS_TOPIC "Huygens" #define MQTT_MAX_PACKET_SIZE 512; #define T_READ 2 #define SENSOR_ID 42 _huygens_HuygensMessage_SensorValue vals[MAX_MEASUREMENTS]; uint8_t cursor = 0; uint32_t msgId = 1; byte _mac[] = {0x90,0xA2,0xDA, 0x0D,0x8E,0xC0}; byte _ip[] = {192,168,1,121}; byte server[] = { 192, 168, 1, 100 }; EthernetClient ethClient; uint32_t timeOffset; void mqtt_callback(char* topic, byte* payload, unsigned int length){ return; } PubSubClient client(server, 1883, mqtt_callback, ethClient);
At each interrupt call the values are read from the sensors using the helper functions getHumidity(), getTemperature() and getPressure(), i.e. wrappers for AnalogRead() with some scaling, and stored in the next array element of the global measurement buffer vals[]. The timestamp is calculated using the helper function getTimestamp(), which is basically a wrapper for millis() and adds the offset to get a correct unix timestamp.
After the measurement the cursor is shifted for the next measurement. If the buffer is full, the message is sent using sendMessage() and the cursor is reset to the first element.
ISR(TIMER1_COMPA_vect){ Serial.println(F("Reading Sensors")); vals[cursor].timestamp = getTimestamp(); float h = getHumidity(); float t = getTemperature(); float p = getPressure(); if (!isnan(t)) { vals[cursor].has_temperature = true; vals[cursor].temperature = t; } if (!isnan(h)) { vals[cursor].has_humidity = true; vals[cursor].humidity = h; } if (!isnan(p)) { vals[cursor].has_pressure = true; vals[cursor].pressure = p; } if (cursor < MAX_MEASUREMENTS - 1) { ++cursor; } else { cursor = 0; sendMessage(); } }
When a message is sent first a message struct msg as well as the the output stream (i.e. the buffer array sMsg) are initialised. Then the header fields are filled and the message is encoded. Afterwards the message is published to the MQTT broker. If that suceeded the measurement buffer is reset and the message ID incremented for the following message.
void sendMessage() { Serial.println(F("Sending Message")); _huygens_HuygensMessage msg; uint8_t sMsg[MSG_SIZE]; pb_ostream_t buffer = pb_ostream_from_buffer(sMsg, sizeof(sMsg)); msg.timestamp = getTimestamp(); msg.sensorId = SENSOR_ID; msg.messageId = msgId; msg.sensorState = huygens_HuygensMessage_SensorState_ON; //Map the callback function to encode the submessages for sensorValues msg.sensorValues.funcs.encode = &sensorValues_callback; msg.sensorValues.arg = NULL; //Encode the message if (!pb_encode(&buffer, huygens_HuygensMessage_fields, &msg)) { Serial.println(F("Encoding failed")); Serial.println(PB_GET_ERROR(&buffer)); } //Publish everything to the MQTT broker if (!client.publish(HUYGENS_TOPIC, sMsg, buffer.bytes_written)) { Serial.println(F("Publishing Failed")); } else { resetValues(); ++msgId; } }
As seen, to encode the submessages a callback function is required. In our case, the measurement buffer is iterated and each measurement is encoded to the sending buffer of the parent message.
bool sensorValues_callback(pb_ostream_t *stream, const pb_field_t *field, void * const *arg) { for (int i=0; i<MAX_MEASUREMENTS; ++i) { if (vals[i].timestamp != 0) { if (!pb_encode_tag_for_field(stream, field)) return false; if (!pb_encode_submessage(stream, huygens_HuygensMessage_SensorValue_fields, &vals[i])) return false; } } return true; }
To reset the measurement buffer after sending each field is set to zero. Also, the cursor is reset to the first element.
void resetValues() { cursor = 0; for (int i=0;i<MAX_MEASUREMENTS;i++) { vals[i].timestamp = 0; vals[i].has_humidity = false; vals[i].has_pressure = false; vals[i].has_temperature = false; vals[i].humidity = 0; vals[i].pressure = 0; vals[i].temperature = 0; } }
The setup function is quite simple and initialises all the required stuff. As the messages require Unix timestamp, the current time is fetched from an NTP server using ntpUnixTime() as described on the Arduino Playground and stored in the timeOffset. secs() is a small helper function that measures the elapsed time in seconds since startup, i.e. millis()/1000.
void setup() { Serial.begin(9600); Serial.println(F("Starting Ethernet")); Ethernet.begin(_mac, _ip); Serial.println(F("Connecting to MQTT Broker")); if (!client.connect("HuygensArduino")){ Serial.println(F("Connection failed")); } Serial.println(F("Getting NTP time Offset:")); EthernetUDP udp; timeOffset = ntpUnixTime(udp) - secs(); Serial.println(timeOffset); Serial.println(F("Setting up Clock Interrupt")); // Clock Interrupt setup cli(); TCCR1A = 0;// set entire TCCR1A register to 0 TCCR1B = 0;// same for TCCR1B TCNT1 = 0;//initialize counter value to 0 // set compare match register for 1hz increments OCR1A = 16000000 * T_READ / 1024 - 1;// (must be <65536) // turn on CTC mode TCCR1B |= (1 << WGM12); // Set CS12 and CS10 bits for 1024 prescaler TCCR1B |= (1 << CS12) | (1 << CS10); // enable timer compare interrupt TIMSK1 |= (1 << OCIE1A); sei(); Serial.println(F("Arduino Ready")); } void loop() { }
As all the stuff is triggered by the clock interrupt, the loop function remains empty.
In this example, we implemented a sensor that regularly publishes messages to a MQTT topic. The overall program size is less than 20kB and the RAM usage is less than 1200 Bytes. This leaves enough capacity to increase the functionality if required.
Hi Stefan,
Great article. was very helpfull.
I am trying to use nanopb in arduino to generate the protobuf payload to send data to Kapua. Forsome reason I am not getting the expected payload. Would you please help me with this?
Regards,
Carlos