MQTT
MQTT
Message Queuing Telemetry Transport
머신 대 머신(M2M) 통신에 사용되는 표준 메시징 프로토콜
MQTT 특징
가볍고 효율적: MQTT 제어 메시지와 메시지 헤더의 크기가 작다.
애플리케이션 계층 프로토콜이다.
Broker, Publisher, Subscriber 모델로 이루어진다.
발행/구독 구조를 사용하여 다대다 통신을 제공한다.
QoS(Quality of Service)
QoS를 3단계로 제공, 최소 1회, 최대 1회 정확히 1회로 신뢰성을 보장한다.
0: 최대 1회 전송, 토픽을 통해 메시지를 전송만 함
1: 최소 1회 전송, 구독자가 메시지를 받았는지 불확실하다면 재전송 함 (기본)
2: 정확히 1회 수신, 구독자가 메시지를 정확히 한 번 수신할 수 있도록 보장 함, 느림
QoS: https://dalkomit.tistory.com/111
Publisher(발행자): 토픽을 발행
Broker(서버): 발행된 토픽이 브로커를 거쳐서 통신
Subscriber(구독자): 토픽을 구독하기 위한 목적으로 브로커에 연결
MQTT 공식
MQTT - The Standard for IoT Messaging
MQTT 배민 Tech
MQTT 적용을 통한 중계시스템 개선 | 우아한형제들 기술블로그 (woowahan.com)
Naver D2, Facebook
Facebook 메신저와 MQTT (naver.com)
MQTT를 위한 Mosquitto 설치 및 테스트
Mosquitto: 이클립스에서 만든 오픈소스 메시지 Broker, publish/subscribe모델을 이용해 경량화 통신 기능을 제공
Download
Source mosquitto-2.0.18.tar.gz (GPG signature) Git source code repository (github.com) Older downloads are available at https://mosquitto.org/files/ Binary Installation The binary packages listed be
mosquitto.org
Mosquitto 버전 확인
Mosquitto 실행 파일
ctrl: Mosquitto broker 초기화 및 설정
passwd: Mosquitto 비밀번호 파일 관리
rr: request/response 클라이언트
pub: publish 클라이언트
sub: subscribe 클라이언트
Broker 실행(-v는 로그 출력 옵션)
mosquitto.exe -v
Subscriber 실행
Publisher 실행
Visual Stdio에서 mosquitto 사용
mosquitto 라이브러리 프로젝트 폴더로 가져오기
작업할 프로젝트 경로에 /includes 디렉토리와 /libraries 디렉토리를 만들고, 헤더 파일과 .lib 파일을 각각 넣어준다.
나머지 .dll 파일은 두 디렉토리의 ../ 경로에 위치시켜준다. (이렇게 안해주면 dll 파일을 mosquitto가 못 찾았다. ㅇㅅㅇ)
Visual studio 프로젝트 속성 설정
1. 디버깅
2. C/C++ - 일반
3. 링커 - 일반
4. 링커 - 입력
Mosquitto를 사용해 MQTT 통신 구현
MQTT 통신 구조
센서 - 브로커 - 서버 - 브로커 - 스위치 - 브로커 - 서버
C++ 코드 (Windows 환경)
- LedSensor.cpp
#include <mosquitto.h>
#include <cstring>
#include <string>
#include <iostream>
#include <Windows.h>
#include <process.h>
using namespace std;
#define mqtt_host "127.0.0.1"
#define mqtt_port 1883
void connect_callback(struct mosquitto* mosq, void* obj, int result)
{
// Todo: 연결을 시도할 경우의 Callback 함수...
}
int main(int argc, char** argv)
{
struct mosquitto* mosq;
mosquitto_lib_init();
char clientid[48]; // nullptr을 사용해도 상관 없음..
int ret = 0;
memset(clientid, NULL, sizeof(clientid));
snprintf(clientid, sizeof(clientid) - 1, "LightSensor", _getpid());
mosq = mosquitto_new(clientid, true, nullptr);
if (mosq)
{
ret = mosquitto_connect(mosq, mqtt_host, mqtt_port, 10);
if (ret) {
cout << "[ERROR] Can not connect to server\n";
exit(-1);
}
}
else
{
cout << "[ERROR] Can not generate mosquitto object\n";
exit(-1);
}
while (true)
{
string msg;
cout << "room/get/light: ";
cin >> msg;
ret = mosquitto_publish(mosq, NULL, "room/get/light", msg.size() , msg.c_str(), 0, false);
if (ret) {
cout << "[ERROR] Can not publish to server\n";
exit(-1);
}
}
mosquitto_disconnect(mosq);
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
return 0;
}
- IoT Server.cpp
#include <mosquitto.h>
#include <cstring>
#include <string>
#include <iostream>
#include <Windows.h>
#include <process.h>
#include <Windows.h>
#include <time.h>
using namespace std;
#define mqtt_host "127.0.0.1"
#define mqtt_port 1883
static int run = 1;
void message_callback(struct mosquitto* mosq, void* obj, const struct mosquitto_message* message)
{
//cout << "receive message: " << (char*)(message->payload) << " (" << (message->topic) << ")\n";
if (strcmp(message->topic, "room/get/light") == 0)
{
cout << "Light: " << (char*)(message->payload) << " (" << (message->topic) << ")\n";
int ten = 1;
int light = 0;
for (int i = message->payloadlen - 1; i >= 0; --i)
{
char curr = ((char*)message->payload)[i];
if (curr < '0' || curr > '9') {
cout << "[ERROR] Invalid input!!!\n";
return;
}
light += ten * (curr - '0');
ten *= 10;
}
string msg = (light > 20) ? "ON" : "OFF";
cout << "[Publish] room/put/switch: " << msg << "\n";
int ret = mosquitto_publish(mosq, NULL, "room/put/switch", sizeof(msg), msg.c_str(), 0, false);
if (ret) {
cout << "[ERROR] Can not publish to server\n";
return;
}
}
else if(strcmp(message->topic, "room/get/switch") == 0)
{
char buf[5];
strcpy_s(buf, (char*)message->payload);
if (strcmp(buf, "ON") == 0)
{
cout << "Switch: Turn on complete (" << (message->topic) << ")\n";
}
else if (strcmp(buf, "OFF") == 0)
{
cout << "Switch: Turn off complete (" << (message->topic) << ")\n";
}
else
{
cout << "[ERROR] Invalid input!!!\n";
return;
}
}
}
int main(int argc, char** argv)
{
struct mosquitto* mosq;
mosquitto_lib_init();
char clientid[48]; // nullptr을 사용해도 상관 없음..
int ret = 0;
// 객체를 생성하고 서버에 연결
memset(clientid, NULL, sizeof(clientid));
snprintf(clientid, sizeof(clientid) - 1, "IoT", _getpid());
mosq = mosquitto_new(clientid, true, nullptr);
if (mosq)
{
ret = mosquitto_connect(mosq, mqtt_host, mqtt_port, 10);
if (ret) {
cout << "[ERROR] Can not connect to server\n";
exit(-1);
}
}
else {
cout << "[ERROR] Failed to generate mosquitto object\n";
exit(-1);
}
// 콜백함수 등록
mosquitto_message_callback_set(mosq, message_callback);
// 구독
mosquitto_subscribe(mosq, NULL, "room/get/light", 0);
mosquitto_subscribe(mosq, NULL, "room/get/switch", 0);
while (run)
{
ret = mosquitto_loop(mosq, -1, 1);
if (run && ret) {
cout << "[ERROR] connection error\n";
Sleep(3000);
mosquitto_reconnect(mosq);
}
}
// 연결 종료, 구조체 소멸, 라이브러리 정리
mosquitto_disconnect(mosq);
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
return 0;
}
- Switch.cpp
#include <mosquitto.h>
#include <cstring>
#include <string>
#include <iostream>
#include <Windows.h>
#include <process.h>
#include <Windows.h>
#include <time.h>
using namespace std;
#define mqtt_host "127.0.0.1"
#define mqtt_port 1883
static int run = 1;
void message_callback(struct mosquitto* mosq, void* obj, const struct mosquitto_message* message)
{
//cout << "receive message: " << (char*)(message->payload) << " (" << (message->topic) << ")\n";
cout << "IoT: " << (char*)(message->payload) << " (" << (message->topic) << ")\n";
string msg;
if (strcmp((char*)message->payload, "ON") == 0)
{
msg = "ON";
}
else if (strcmp((char*)message->payload, "OFF") == 0)
{
msg = "OFF";
}
else {
cout << "[ERROR] Invalid input\n";
return;
}
cout << "[Publish] room/get/switch: " << msg << "\n";
int ret = mosquitto_publish(mosq, NULL, "room/get/switch", sizeof(msg), msg.c_str(), 0, false);
if (ret) {
cout << "[ERROR] Can not publish to server\n";
return;
}
}
int main(int argc, char** argv)
{
struct mosquitto* mosq;
mosquitto_lib_init();
char clientid[48]; // nullptr을 사용해도 상관 없음..
int ret = 0;
// 객체를 생성하고 서버에 연결
memset(clientid, NULL, sizeof(clientid));
snprintf(clientid, sizeof(clientid) - 1, "Switch", _getpid());
mosq = mosquitto_new(clientid, true, nullptr);
if (mosq)
{
ret = mosquitto_connect(mosq, mqtt_host, mqtt_port, 10);
if (ret) {
cout << "[ERROR] Can not connect to server\n";
exit(-1);
}
}
else {
cout << "[ERROR] Failed to generate mosquitto object\n";
exit(-1);
}
// 콜백함수 등록
mosquitto_message_callback_set(mosq, message_callback);
// 구독
mosquitto_subscribe(mosq, NULL, "room/put/switch", 0);
while (run)
{
ret = mosquitto_loop(mosq, -1, 1);
if (run && ret) {
cout << "[ERROR] connection error\n";
Sleep(3000);
mosquitto_reconnect(mosq);
}
}
// 연결 종료, 구조체 소멸, 라이브러리 정리
mosquitto_disconnect(mosq);
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
return 0;
}
실행 결과