Integrating with MQTT: From First Steps to Production-Ready Architectures
This article explains how to integrate RedBeam with MQTT — from a simple proof-of-concept to a robust, production-ready setup.
MQTT (Message Queuing Telemetry Transport) has become one of the most popular lightweight messaging protocols for IoT, event-driven systems, and distributed applications. Its publish/subscribe model, small footprint, and resilience in unreliable networks make it a perfect choice when you need to move data between services.
It is also attractive from a security perspective as it does not require opening up any ports on the receiving side, nor requires hosting an http endpoint, with all the challenges of authentication.
But while MQTT is easy to get started with, building a production-ready integration requires some careful choices — especially around client IDs, session durations, and scaling with $share.
In this article, we’ll walk through the journey:
- Exploring messages with MQTTX
- A simple proof-of-concept (POC) client
- Adding Horizontal Scaling with shared subscriptions ($share)
- Running geo-redundant clusters in parallel
Examples are provided in both Node.js and C#.
First Things First
RedBeam has support for the creation of powerful workflows, allowing you to configure your own condition for forwarding information to your system. Before you can consume any data from our MQTT service, you first need to set it up.
The Workflows page allows you to create a new workflow. For initial development we recommend keeping it simple: a single workflow that whenever any item is changed in any way, forward a copy of that item over MQTT.
As soon as you add the “Forward Over MQTT” action to the workflow, it will display the hostname and topic.
You can also choose between Secure Web Sockets (WSS) and SSL over TCP.
It will also generate a unique username for your integration. You cannot manually change the username, but you should provide a password. Make sure to create a secure password, and store that information appropriately. We are unable to retrieve your password. You can reset it through the same screen, but any existing integrations will need to be updated with the new password.
Once you have your integration operational, our support team is available to help refine your workflow configuration.
Step 0: Exploring MQTT Messages with MQTTX
Before writing code, it’s useful to see what’s actually flowing through an MQTT broker. A great tool for this is MQTTX.
MQTTX provides a web and desktop client with which you can:
- Connect to our broker using a chosen client ID.
- Validate your connection credentials
- Subscribe to topics (including wildcards).
- Inspect live message payloads
Think of it as your MQTT "multimeter" — invaluable for debugging and exploration before you start, and while you are coding.
No changes are necessary to the Advanced or Last WIll and Testament sections.
After entering the information, click Connect, and add a New Subscription. We recommend switching from Plain Text to JSON on the message section.
If you now make a change to any item or asset in RedBeam, you should see a message come through on the MQTTX webpage.
Step 1: A Basic POC with a Single Client
The simplest integration pattern is a single consumer subscribing directly to a topic. This is suitable for prototypes and proofs of concept, where the focus is on demonstrating value rather than building resilience.
The Role of Session Duration
When you connect to a broker, you can configure a non-zero session duration (or "session expiry interval"). This tells the broker how long it should retain:
- Subscriptions
- Unacknowledged messages
If the client goes offline and reconnects with the same client ID, the broker can deliver any queued messages that arrived while it was away.
This is useful for short disruptions — e.g., restarting your service, or while you hit a breakpoint during debugging.
⚠️ Don’t rely on this as a long-term queue. MQTT is not designed to be durable storage. For production systems, you should process messages as they arrive, not days later.
POC Code Examples
Node.js (using mqtt package)
const mqtt = require("mqtt");
const clientId = "poc-client"; // must stay consistent for buffering
const client = mqtt.connect("mqtt://app-mqtt.redbeam.com", {
clientId,
username: "demoUser", // please secure
password: "demoPass", // credentials appropriately
clean: false, // required for session persistence
properties: {
// keep session for 1 hour when offline
sessionExpiryInterval: 3600
}
});
client.on("connect", () => {
console.log("Connected (POC)");
client.subscribe("item/item/1234567890/+", { qos: 1 });
});
client.on("message", (topic, message) => {
console.log(`Received on ${topic}: ${message.toString()}`);
});
C# (using MQTTnet)
using MQTTnet;
using MQTTnet.Client;
var factory = new MqttFactory();
var client = factory.CreateMqttClient();
var options = new MqttClientOptionsBuilder()
.WithClientId("poc-client") // must stay consistent
.WithCredentials("demoUser", "demoPass") // secure appropriately
.WithCleanSession(false) // enable session persistence
.WithSessionExpiryInterval(3600) // keep session for 1 hour
.WithTcpServer("app-mqtt.redbeam.com")
.Build();
client.ApplicationMessageReceivedAsync += e =>
{
Console.WriteLine($"Received on {e.ApplicationMessage.Topic}: {e.ApplicationMessage.ConvertPayloadToString()}");
return Task.CompletedTask;
};
await client.ConnectAsync(options);
await client.SubscribeAsync("item/item/1234567890/+", MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce);
Step 2: Scaling Out with $share
As your application moves into production, you need to handle higher volumes and ensure fault tolerance. MQTT provides a built-in way to achieve horizontal scaling: shared subscriptions.
A shared subscription uses the special $share/{groupName}/{topic} syntax. For example:
$share/myServiceGroup/item/item/1234567890/+
Here’s what happens:
- Multiple service instances can subscribe with the same $share group.
- Messages published to the topic are load balanced across subscribers in a round-robin fashion.
- This allows you to scale horizontally by simply adding more instances of your service.
Fail-Over Considerations
To ensure smooth fail-over:
- Set session duration to 0 (clean session = true). Otherwise, messages may pile up for dead clients instead of going to healthy ones.
- Use unique client IDs. Each instance must have its own ID, ideally generated fresh at startup.
$share Code Examples
Node.js
const mqtt = require("mqtt");
// Generate unique client ID for each service instance
const clientId = "service-" + Math.random().toString(16).substr(2, 8);
const client = mqtt.connect("mqtt://app-mqtt.redbeam.com", {
clientId,
username: "demoUser",
password: "demoPass",
clean: true, // session duration = 0
});
client.on("connect", () => {
console.log("Connected (shared subscription)");
client.subscribe(
" $share/myServiceGroup/item/item/1234567890/+",
{ qos:1 }
);
});
client.on("message", (topic, message) => {
console.log(`[${clientId}] ${topic}: ${message.toString()}`);
});
C#
using MQTTnet;
using MQTTnet.Client;
var factory = new MqttFactory();
var client = factory.CreateMqttClient();
// Unique client ID each time
var clientId = "service-" + Guid.NewGuid().ToString("N");
var options = new MqttClientOptionsBuilder()
.WithClientId(clientId)
.WithCredentials("demoUser", "demoPass")
.WithCleanSession(true) // session duration = 0
.WithTcpServer("app-mqtt.redbeam.com")
.Build();
client.ApplicationMessageReceivedAsync += e =>
{
Console.WriteLine($"[{clientId}] {e.ApplicationMessage.Topic}: {e.ApplicationMessage.ConvertPayloadToString()}");
return Task.CompletedTask;
};
await client.ConnectAsync(options);
await client.SubscribeAsync("$share/myServiceGroup/item/item/1234567890/+", MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce);
Step 3: Geo-Redundant Clusters with Multiple $shares
Shared subscriptions aren’t just for horizontal scaling — you can also run geo-redundant clusters in parallel.
For example:
- $share/clusterA/item/item/1234567890/+
- $share/clusterB/item/item/1234567890/+
Both clusters get the same messages, and each balances load among its own local services. This enables active-active multi-region deployments, improving both resilience and performance.
⚠️Geo redundancy is not required for a production environment
Geo-Redundancy Code Examples
The only change is the $share group name — you’d deploy the same service code in different regions with different $share groups.
Node.js
const mqtt = require("mqtt");
const clientId = "geo-" + Math.random().toString(16).substr(2, 8);
const client = mqtt.connect("mqtt://app-mqtt.redbeam.com", {
clientId,
username: "demoUser",
password: "demoPass",
clean: true,
});
const clusterGroup = process.env.CLUSTER_NAME || "clusterA";
const topic = `$share/${clusterGroup}/item/item/1234567890/+`;
client.on("connect", () => {
console.log(`Connected (${clusterGroup})`);
client.subscribe(topic, { qos: 1 });
});
client.on("message", (topic, message) => {
console.log(`[${clusterGroup}] ${topic}: ${message.toString()}`);
});
C#
using MQTTnet;
using MQTTnet.Client;
var factory = new MqttFactory();
var client = factory.CreateMqttClient();
var clientId = "geo-" + Guid.NewGuid().ToString("N");
var clusterGroup = Environment.GetEnvironmentVariable("CLUSTER_NAME") ?? "clusterA";
var options = new MqttClientOptionsBuilder()
.WithClientId(clientId)
.WithCredentials("demoUser", "demoPass")
.WithCleanSession(true)
.WithTcpServer("app-mqtt.redbeam.com")
.Build();
client.ApplicationMessageReceivedAsync += e =>
{
Console.WriteLine($"[{clusterGroup}]
{e.ApplicationMessage.Topic}: {e.ApplicationMessage.ConvertPayloadToString()}");
return Task.CompletedTask;
};
await client.ConnectAsync(options);
await client.SubscribeAsync($"$share/{clusterGroup}/item/item/1234567890/+", MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce);
Key Takeaways
- Start simple: Use tools like MQTTX to explore your data and validate assumptions.
- POC approach: A single client with a stable client ID and non-zero session duration works fine for short offline buffering.
- Production: Scale out with $share: Shared subscriptions enable round-robin processing, horizontal scaling, and fail-over. Set session duration to 0 and always use unique client IDs.
- Geo-redundancy: Multiple $share groups allow parallel consumption across regions for high availability.
- Client IDs are critical:
- POC = consistent client ID (to enable session buffering).
- Production = unique client IDs (regenerated at startup for scaling and reliability).
- POC = consistent client ID (to enable session buffering).
By following these patterns, you can confidently evolve an MQTT integration from a simple prototype into a robust, horizontally scalable, geo-redundant production system.