래울 2024. 6. 26. 22:32

MQTT

Message Queuing Telemetry Transport

머신 대 머신(M2M) 통신에 사용되는 표준 메시징 프로토콜

 

MQTT 특징

가볍고 효율적: MQTT 제어 메시지와 메시지 헤더의 크기가 작다.

애플리케이션 계층 프로토콜이다.

Broker, Publisher, Subscriber 모델로 이루어진다.

발행/구독 구조를 사용하여 다대다 통신을 제공한다.

 

QoS(Quality of Service)

QoS를 3단계로 제공, 최소 1회, 최대 1회 정확히 1회로 신뢰성을 보장한다.

0: 최대 1회 전송, 토픽을 통해 메시지를 전송만 함

1: 최소 1회 전송, 구독자가 메시지를 받았는지 불확실하다면 재전송 함 (기본)

2: 정확히 1회 수신, 구독자가 메시지를 정확히 한 번 수신할 수 있도록 보장 함, 느림

QoS: https://dalkomit.tistory.com/111

 

Publisher(발행자): 토픽을 발행

Broker(서버): 발행된 토픽이 브로커를 거쳐서 통신

Subscriber(구독자): 토픽을 구독하기 위한 목적으로 브로커에 연결

 

MQTT 공식

MQTT - The Standard for IoT Messaging

 

MQTT 배민 Tech

MQTT 적용을 통한 중계시스템 개선 | 우아한형제들 기술블로그 (woowahan.com)

 

Naver D2, Facebook

Facebook 메신저와 MQTT (naver.com)


MQTT를 위한 Mosquitto 설치 및 테스트

Mosquitto: 이클립스에서 만든 오픈소스 메시지 Broker, publish/subscribe모델을 이용해 경량화 통신 기능을 제공

Download | Eclipse Mosquitto

 

Download

Source mosquitto-2.0.18.tar.gz (GPG signature) Git source code repository (github.com) Older downloads are available at https://mosquitto.org/files/ Binary Installation The binary packages listed be

mosquitto.org

 

Mosquitto 버전 확인

 

 

Mosquitto 실행 파일

ctrl: Mosquitto broker 초기화 및 설정

passwd: Mosquitto 비밀번호 파일 관리

rr: request/response 클라이언트

pub: publish 클라이언트

sub: subscribe 클라이언트

 

Broker 실행(-v는 로그 출력 옵션)

mosquitto.exe -v

log 메시지


Subscriber 실행

 

Publisher 실행

 


Visual Stdio에서 mosquitto 사용

mosquitto 라이브러리 프로젝트 폴더로 가져오기

작업할 프로젝트 경로에 /includes 디렉토리와 /libraries 디렉토리를 만들고, 헤더 파일과 .lib 파일을 각각 넣어준다.

나머지 .dll 파일은 두 디렉토리의 ../ 경로에 위치시켜준다. (이렇게 안해주면 dll 파일을 mosquitto가 못 찾았다. ㅇㅅㅇ)

 

 

Visual studio 프로젝트 속성 설정

1. 디버깅

 

2. C/C++ - 일반

 

3. 링커 - 일반

 

4. 링커 - 입력


Mosquitto를 사용해 MQTT 통신 구현

MQTT 통신 구조

센서 - 브로커 - 서버 - 브로커 -  스위치 - 브로커 - 서버

 

 

C++ 코드 (Windows 환경)

- LedSensor.cpp

#include <mosquitto.h>
#include <cstring>
#include <string>
#include <iostream>
#include <Windows.h>
#include <process.h>
using namespace std;

#define	mqtt_host "127.0.0.1"
#define	mqtt_port 1883

void connect_callback(struct mosquitto* mosq, void* obj, int result)
{
    // Todo: 연결을 시도할 경우의 Callback 함수...
}

int main(int argc, char** argv)
{
    struct mosquitto* mosq;
    mosquitto_lib_init();
    char clientid[48];	// nullptr을 사용해도 상관 없음..
    int ret = 0;

    memset(clientid, NULL, sizeof(clientid));
    snprintf(clientid, sizeof(clientid) - 1, "LightSensor", _getpid());
    mosq = mosquitto_new(clientid, true, nullptr);
    if (mosq)
    {
        ret = mosquitto_connect(mosq, mqtt_host, mqtt_port, 10);
        if (ret) {
            cout << "[ERROR] Can not connect to server\n";
            exit(-1);
        }
    }
    else
    {
        cout << "[ERROR] Can not generate mosquitto object\n";
        exit(-1);
    }

    while (true)
    {
        string msg;
        cout << "room/get/light: ";
        cin >> msg;
        ret = mosquitto_publish(mosq, NULL, "room/get/light", msg.size() , msg.c_str(), 0, false);
        if (ret) {
            cout << "[ERROR] Can not publish to server\n";
            exit(-1);

        }
    }

    mosquitto_disconnect(mosq);
    mosquitto_destroy(mosq);
    mosquitto_lib_cleanup();
    return 0;
}

 

- IoT Server.cpp

#include <mosquitto.h>
#include <cstring>
#include <string>
#include <iostream>
#include <Windows.h>
#include <process.h>
#include <Windows.h>
#include <time.h>
using namespace std;

#define	mqtt_host "127.0.0.1"
#define	mqtt_port 1883

static int run = 1;

void message_callback(struct mosquitto* mosq, void* obj, const struct mosquitto_message* message)
{
    //cout << "receive message: " << (char*)(message->payload) << " (" << (message->topic) << ")\n";
    if (strcmp(message->topic, "room/get/light") == 0)
    {
        cout << "Light: " << (char*)(message->payload) << " (" << (message->topic) << ")\n";
        int ten = 1;
        int light = 0;
        for (int i = message->payloadlen - 1; i >= 0; --i)
        {
            char curr = ((char*)message->payload)[i];
            if (curr < '0' || curr > '9') {
                cout << "[ERROR] Invalid input!!!\n";
                return;
            }
            light += ten * (curr - '0');
            ten *= 10;
        }

        string msg = (light > 20) ? "ON" : "OFF";
        cout << "[Publish] room/put/switch: " << msg << "\n";
        int ret = mosquitto_publish(mosq, NULL, "room/put/switch", sizeof(msg), msg.c_str(), 0, false);
        if (ret) {
            cout << "[ERROR] Can not publish to server\n";
            return;
        }
    }
    else if(strcmp(message->topic, "room/get/switch") == 0)
    {
        char buf[5];
        strcpy_s(buf, (char*)message->payload);
        if (strcmp(buf, "ON") == 0)
        {
            cout << "Switch: Turn on complete (" << (message->topic) << ")\n";
        }
        else if (strcmp(buf, "OFF") == 0)
        {
            cout << "Switch: Turn off complete (" << (message->topic) << ")\n";
        }
        else
        {
            cout << "[ERROR] Invalid input!!!\n";
            return;
        }
    }
}

int main(int argc, char** argv)
{
    struct mosquitto* mosq;
    mosquitto_lib_init();
    char clientid[48];	// nullptr을 사용해도 상관 없음..
    int ret = 0;

    // 객체를 생성하고 서버에 연결
    memset(clientid, NULL, sizeof(clientid));
    snprintf(clientid, sizeof(clientid) - 1, "IoT", _getpid());
    mosq = mosquitto_new(clientid, true, nullptr);
    if (mosq)
    {
        ret = mosquitto_connect(mosq, mqtt_host, mqtt_port, 10);
        if (ret) {
            cout << "[ERROR] Can not connect to server\n";
            exit(-1);
        }
    }
    else {
        cout << "[ERROR] Failed to generate mosquitto object\n";
        exit(-1);
    }

    // 콜백함수 등록
    mosquitto_message_callback_set(mosq, message_callback);

    // 구독
    mosquitto_subscribe(mosq, NULL, "room/get/light", 0);
    mosquitto_subscribe(mosq, NULL, "room/get/switch", 0);

    while (run)
    {
        ret = mosquitto_loop(mosq, -1, 1);
        if (run && ret) {
            cout << "[ERROR] connection error\n";
            Sleep(3000);
            mosquitto_reconnect(mosq);
        }
    }

    // 연결 종료, 구조체 소멸, 라이브러리 정리
    mosquitto_disconnect(mosq);
    mosquitto_destroy(mosq);
    mosquitto_lib_cleanup();
    return 0;
}

 

- Switch.cpp

#include <mosquitto.h>
#include <cstring>
#include <string>
#include <iostream>
#include <Windows.h>
#include <process.h>
#include <Windows.h>
#include <time.h>
using namespace std;

#define	mqtt_host "127.0.0.1"
#define	mqtt_port 1883

static int run = 1;

void message_callback(struct mosquitto* mosq, void* obj, const struct mosquitto_message* message)
{
    //cout << "receive message: " << (char*)(message->payload) << " (" << (message->topic) << ")\n";
    cout << "IoT: " << (char*)(message->payload) << " (" << (message->topic) << ")\n";
    string msg;
    if (strcmp((char*)message->payload, "ON") == 0)
    {
        msg = "ON";
    }
    else if (strcmp((char*)message->payload, "OFF") == 0)
    {
        msg = "OFF";
    }
    else {
        cout << "[ERROR] Invalid input\n";
        return;
    }
    cout << "[Publish] room/get/switch: " << msg << "\n";
    int ret = mosquitto_publish(mosq, NULL, "room/get/switch", sizeof(msg), msg.c_str(), 0, false);
    if (ret) {
        cout << "[ERROR] Can not publish to server\n";
        return;
    }
}

int main(int argc, char** argv)
{
    struct mosquitto* mosq;
    mosquitto_lib_init();
    char clientid[48];	// nullptr을 사용해도 상관 없음..
    int ret = 0;

    // 객체를 생성하고 서버에 연결
    memset(clientid, NULL, sizeof(clientid));
    snprintf(clientid, sizeof(clientid) - 1, "Switch", _getpid());
    mosq = mosquitto_new(clientid, true, nullptr);
    if (mosq)
    {
        ret = mosquitto_connect(mosq, mqtt_host, mqtt_port, 10);
        if (ret) {
            cout << "[ERROR] Can not connect to server\n";
            exit(-1);
        }
    }
    else {
        cout << "[ERROR] Failed to generate mosquitto object\n";
        exit(-1);
    }

    // 콜백함수 등록
    mosquitto_message_callback_set(mosq, message_callback);

    // 구독
    mosquitto_subscribe(mosq, NULL, "room/put/switch", 0);

    while (run)
    {
        ret = mosquitto_loop(mosq, -1, 1);
        if (run && ret) {
            cout << "[ERROR] connection error\n";
            Sleep(3000);
            mosquitto_reconnect(mosq);
        }
    }

    // 연결 종료, 구조체 소멸, 라이브러리 정리
    mosquitto_disconnect(mosq);
    mosquitto_destroy(mosq);
    mosquitto_lib_cleanup();
    return 0;
}

 

 

실행 결과

각각 exe 실행
결과 화면