Message Queuing Telemetry Transport
머신 대 머신(M2M) 통신에 사용되는 표준 메시징 프로토콜
가볍고 효율적: MQTT 제어 메시지와 메시지 헤더의 크기가 작다.
애플리케이션 계층 프로토콜이다.
Broker, Publisher, Subscriber 모델로 이루어진다.
발행/구독 구조를 사용하여 다대다 통신을 제공한다.
QoS(Quality of Service)
QoS를 3단계로 제공, 최소 1회, 최대 1회 정확히 1회로 신뢰성을 보장한다.
0: 최대 1회 전송, 토픽을 통해 메시지를 전송만 함
1: 최소 1회 전송, 구독자가 메시지를 받았는지 불확실하다면 재전송 함 (기본)
2: 정확히 1회 수신, 구독자가 메시지를 정확히 한 번 수신할 수 있도록 보장 함, 느림
QoS: https://dalkomit.tistory.com/111
Publisher(발행자): 토픽을 발행
Broker(서버): 발행된 토픽이 브로커를 거쳐서 통신
Subscriber(구독자): 토픽을 구독하기 위한 목적으로 브로커에 연결
MQTT - The Standard for IoT Messaging
MQTT 배민 Tech
MQTT 적용을 통한 중계시스템 개선 | 우아한형제들 기술블로그 (woowahan.com)
Naver D2, Facebook
Facebook 메신저와 MQTT (naver.com)
MQTT를 위한 Mosquitto 설치 및 테스트
Mosquitto: 이클립스에서 만든 오픈소스 메시지 Broker, publish/subscribe모델을 이용해 경량화 통신 기능을 제공
Mosquitto: 이클립스에서 만든 오픈소스 메시지 Broker, publish/subscribe모델을 이용해 경량화 통신 기능을 제공
Mosquitto 버전 확인
Mosquitto 실행 파일
ctrl: Mosquitto broker 초기화 및 설정
passwd: Mosquitto 비밀번호 파일 관리
rr: request/response 클라이언트
pub: publish 클라이언트
sub: subscribe 클라이언트
Broker 실행(-v는 로그 출력 옵션)
mosquitto.exe -v
log 메시지
Subscriber 실행Publisher 실행
Visual Stdio에서 mosquitto 사용
mosquitto 라이브러리 프로젝트 폴더로 가져오기
작업할 프로젝트 경로에 /includes 디렉토리와 /libraries 디렉토리를 만들고, 헤더 파일과 .lib 파일을 각각 넣어준다.
나머지 .dll 파일은 두 디렉토리의 ../ 경로에 위치시켜준다. (이렇게 안해주면 dll 파일을 mosquitto가 못 찾았다. ㅇㅅㅇ)
Visual studio 프로젝트 속성 설정
1. 디버깅
2. C/C++ - 일반
3. 링커 - 일반
4. 링커 - 입력
Mosquitto를 사용해 MQTT 통신 구현
MQTT 통신 구조
센서 - 브로커 - 서버 - 브로커 - 스위치 - 브로커 - 서버
C++ 코드 (Windows 환경)
- LedSensor.cpp
#include <mosquitto.h> #include <cstring> #include <string> #include <iostream> #include <Windows.h> #include <process.h> using namespace std; #define mqtt_host "" #define mqtt_port 1883 void connect_callback(struct mosquitto* mosq, void* obj, int result) { // Todo: 연결을 시도할 경우의 Callback 함수... } int main(int argc, char** argv) { struct mosquitto* mosq; mosquitto_lib_init(); char clientid[48]; // nullptr을 사용해도 상관 없음.. int ret = 0; memset(clientid, NULL, sizeof(clientid)); snprintf(clientid, sizeof(clientid) - 1, "LightSensor", _getpid()); mosq = mosquitto_new(clientid, true, nullptr); if (mosq) { ret = mosquitto_connect(mosq, mqtt_host, mqtt_port, 10); if (ret) { cout << "[ERROR] Can not connect to server\n"; exit(-1); } } else { cout << "[ERROR] Can not generate mosquitto object\n"; exit(-1); } while (true) { string msg; cout << "room/get/light: "; cin >> msg; ret = mosquitto_publish(mosq, NULL, "room/get/light", msg.size() , msg.c_str(), 0, false); if (ret) { cout << "[ERROR] Can not publish to server\n"; exit(-1); } } mosquitto_disconnect(mosq); mosquitto_destroy(mosq); mosquitto_lib_cleanup(); return 0; }
- IoT Server.cpp
#include <mosquitto.h> #include <cstring> #include <string> #include <iostream> #include <Windows.h> #include <process.h> #include <Windows.h> #include <time.h> using namespace std; #define mqtt_host "" #define mqtt_port 1883 static int run = 1; void message_callback(struct mosquitto* mosq, void* obj, const struct mosquitto_message* message) { //cout << "receive message: " << (char*)(message->payload) << " (" << (message->topic) << ")\n"; if (strcmp(message->topic, "room/get/light") == 0) { cout << "Light: " << (char*)(message->payload) << " (" << (message->topic) << ")\n"; int ten = 1; int light = 0; for (int i = message->payloadlen - 1; i >= 0; --i) { char curr = ((char*)message->payload)[i]; if (curr < '0' || curr > '9') { cout << "[ERROR] Invalid input!!!\n"; return; } light += ten * (curr - '0'); ten *= 10; } string msg = (light > 20) ? "ON" : "OFF"; cout << "[Publish] room/put/switch: " << msg << "\n"; int ret = mosquitto_publish(mosq, NULL, "room/put/switch", sizeof(msg), msg.c_str(), 0, false); if (ret) { cout << "[ERROR] Can not publish to server\n"; return; } } else if(strcmp(message->topic, "room/get/switch") == 0) { char buf[5]; strcpy_s(buf, (char*)message->payload); if (strcmp(buf, "ON") == 0) { cout << "Switch: Turn on complete (" << (message->topic) << ")\n"; } else if (strcmp(buf, "OFF") == 0) { cout << "Switch: Turn off complete (" << (message->topic) << ")\n"; } else { cout << "[ERROR] Invalid input!!!\n"; return; } } } int main(int argc, char** argv) { struct mosquitto* mosq; mosquitto_lib_init(); char clientid[48]; // nullptr을 사용해도 상관 없음.. int ret = 0; // 객체를 생성하고 서버에 연결 memset(clientid, NULL, sizeof(clientid)); snprintf(clientid, sizeof(clientid) - 1, "IoT", _getpid()); mosq = mosquitto_new(clientid, true, nullptr); if (mosq) { ret = mosquitto_connect(mosq, mqtt_host, mqtt_port, 10); if (ret) { cout << "[ERROR] Can not connect to server\n"; exit(-1); } } else { cout << "[ERROR] Failed to generate mosquitto object\n"; exit(-1); } // 콜백함수 등록 mosquitto_message_callback_set(mosq, message_callback); // 구독 mosquitto_subscribe(mosq, NULL, "room/get/light", 0); mosquitto_subscribe(mosq, NULL, "room/get/switch", 0); while (run) { ret = mosquitto_loop(mosq, -1, 1); if (run && ret) { cout << "[ERROR] connection error\n"; Sleep(3000); mosquitto_reconnect(mosq); } } // 연결 종료, 구조체 소멸, 라이브러리 정리 mosquitto_disconnect(mosq); mosquitto_destroy(mosq); mosquitto_lib_cleanup(); return 0; }
- Switch.cpp
#include <mosquitto.h> #include <cstring> #include <string> #include <iostream> #include <Windows.h> #include <process.h> #include <Windows.h> #include <time.h> using namespace std; #define mqtt_host "" #define mqtt_port 1883 static int run = 1; void message_callback(struct mosquitto* mosq, void* obj, const struct mosquitto_message* message) { //cout << "receive message: " << (char*)(message->payload) << " (" << (message->topic) << ")\n"; cout << "IoT: " << (char*)(message->payload) << " (" << (message->topic) << ")\n"; string msg; if (strcmp((char*)message->payload, "ON") == 0) { msg = "ON"; } else if (strcmp((char*)message->payload, "OFF") == 0) { msg = "OFF"; } else { cout << "[ERROR] Invalid input\n"; return; } cout << "[Publish] room/get/switch: " << msg << "\n"; int ret = mosquitto_publish(mosq, NULL, "room/get/switch", sizeof(msg), msg.c_str(), 0, false); if (ret) { cout << "[ERROR] Can not publish to server\n"; return; } } int main(int argc, char** argv) { struct mosquitto* mosq; mosquitto_lib_init(); char clientid[48]; // nullptr을 사용해도 상관 없음.. int ret = 0; // 객체를 생성하고 서버에 연결 memset(clientid, NULL, sizeof(clientid)); snprintf(clientid, sizeof(clientid) - 1, "Switch", _getpid()); mosq = mosquitto_new(clientid, true, nullptr); if (mosq) { ret = mosquitto_connect(mosq, mqtt_host, mqtt_port, 10); if (ret) { cout << "[ERROR] Can not connect to server\n"; exit(-1); } } else { cout << "[ERROR] Failed to generate mosquitto object\n"; exit(-1); } // 콜백함수 등록 mosquitto_message_callback_set(mosq, message_callback); // 구독 mosquitto_subscribe(mosq, NULL, "room/put/switch", 0); while (run) { ret = mosquitto_loop(mosq, -1, 1); if (run && ret) { cout << "[ERROR] connection error\n"; Sleep(3000); mosquitto_reconnect(mosq); } } // 연결 종료, 구조체 소멸, 라이브러리 정리 mosquitto_disconnect(mosq); mosquitto_destroy(mosq); mosquitto_lib_cleanup(); return 0; }
실행 결과
