Разговариваем с датчиками на человеческом: как связать MQTT, TimescaleDB и LLM через Model Context Protocol (MCP)

Любой, кто когда-либо работал на фабрике или в автоматизированных коммерческих помещениях, хорошо знаком с этой болью: данные датчиков занимают гигабайты, но чтобы извлечь из них хоть какую-то пользу, нужно пройти семь кругов ада. Надо писать SQL-запросы, загружать данные в CSV, переводить на Python, составлять графики в Excel, внимательно просматривать и искать аномалии…

Что делать, если вам нужно быстро понять, почему влажность подскочила в три часа ночи в прошлый вторник? Инженер тратит на это полдня. У завода есть данные, а современные большие языковые модели (LLM) обладают отличными аналитическими “мозгами”. Но долгое время они не понимали друг друга. Передавать терабайты необработанных логов напрямую в контекст нейронной сети дорого и бессмысленно.

Решение появилось недавно, и называется оно Model Context Protocol (MCP). Это открытый стандарт, разработанный компанией Anthropic (и активно поддерживаемый Google, Microsoft и другими гигантами), который позволяет LLM безопасно и стандартизированно вызывать “инструменты” во внешнем мире.

Разговариваем с датчиками на человеческом: как связать MQTT, TimescaleDB и LLM через Model Context Protocol (MCP) - 1

В нашем случае таким инструментом будет доступ к базе данных в режиме реального времени. Давайте изменим привычный порядок вещей. Вместо скучной теории я сначала покажу вам, как это работает на практике, а затем мы шаг за шагом разберемся, как сделать этот мост своими руками.

Как это выглядит для инженера (Демо)

Представьте, что на объекте стоят два датчика, которые измеряют температуру и влажность. Данные с них непрерывно пишутся в базу. Вместо написания скриптов дежурный инженер просто открывает чат с AI-ассистентом (это может быть Claude, OpenCode или любой клиент с поддержкой MCP) и пишет обычным текстом:

Инженер: «Посмотри показания датчиков с температурой и влажностью и покажи максимальные и минимальные значения за 3 дня».

Нейросеть сама понимает, что для этого нужно сделать SQL-запрос к TimescaleDB, вызывает наш MCP-инструмент, получает сжатый ответ и выдает аккуратную сводку:

Максимальные и минимальные значения датчиков

Максимальные и минимальные значения датчиков

Инженер: «Нарисуй гистограмму изменений температуры».

Ассистент мгновенно строит наглядный график прямо в интерфейсе чата:

Гистограмма изменений температуры

Гистограмма изменений температуры

Аналогично по влажности с нужным шагом:

Инженер: «Нарисуй гистограмму изменений влажности с шагом в 10 минут».

Гистограмма изменений влажности

Гистограмма изменений влажности

И, наконец, сложная аналитика, на которую у человека ушло бы много времени:

Инженер: «Проанализируй корреляцию при изменении значений температуры и влажности по времени».

Корреляция при изменении значений температуры и влажности

Корреляция при изменении значений температуры и влажности

AI-инженер работает безупречно: он неутомим, бесстрастен и очень быстр. За секунды он просеивает миллионы точек телеметрии и выдает готовые выводы.

Хотите себе такой же инструмент? Давайте соберем его. Наша цепочка данных будет выглядеть так: Датчик → Mosquitto (MQTT) → Telegraf → TimescaleDB → MCP-сервер → LLM.

В качестве серверной платформы мы использовали сервер на Ubuntu. Пройдемся по всем этапам настройки.

Шаг 1. Настраиваем хранилище: PostgreSQL + TimescaleDB

Обычный Postgres под большой нагрузкой от IoT-датчиков начинает «грустить». Поэтому мы используем расширение TimescaleDB — оно превращает Postgres в мощную базу данных для временных рядов (time-series), автоматически разбивая таблицы на партиции (гипертаблицы) по времени.

Для удобства и изоляции развернем всё в Docker. Вот наш docker-compose.yml (обратите внимание, мы вынесли базу на нестандартный порт 5433, чтобы не конфликтовать с локальным Postgres, если он у вас уже установлен):

version: '3.8'

services:
  timescaledb:
    image: timescale/timescaledb:latest-pg16
    container_name: timescaledb
    restart: unless-stopped
    environment:
      POSTGRES_USER: ${DB_USER:-postgres}
      POSTGRES_PASSWORD: ${DB_PASSWORD:-changeme}
      POSTGRES_DB: ${DB_NAME:-sensor_data}
      TZ: UTC
    ports:
      - "${DB_PORT:-5433}:5432"  # Маппинг нестандартного порта на хост
    volumes:
      - timescale_data:/var/lib/postgresql/data
      - ./backups:/backups
      - ./init:/docker-entrypoint-initdb.d
    networks:
      - iot_network
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U ${DB_USER:-postgres} -d ${DB_NAME:-sensor_data}"]
      interval: 30s
      timeout: 10s
      retries: 5
    logging:
      driver: "json-file"
      options:
        max-size: "10m"
        max-file: "3"
    deploy:
      resources:
        limits:
          memory: 1G
        reservations:
          memory: 512M

  pgadmin:  # Веб-интерфейс для удобного управления БД
    image: dpage/pgadmin4:latest
    container_name: pgadmin
    restart: unless-stopped
    environment:
      PGADMIN_DEFAULT_EMAIL: admin@example.com
      PGADMIN_DEFAULT_PASSWORD: ${PGADMIN_PASSWORD:-changeme}
      PGADMIN_CONFIG_SERVER_MODE: 'False'
    ports:
      - "5050:80"
    volumes:
      - pgadmin_data:/var/lib/pgadmin
    networks:
      - iot_network
    depends_on:
      - timescaledb

volumes:
  timescale_data:
    name: timescale_production_data
  pgadmin_data:
    name: pgadmin_storage

networks:
  iot_network:
    name: iot_sensor_network
    driver: bridge

Чтобы база сразу подготовилась к работе, создадим SQL-скрипт инициализации. Положите его в папку ./init/01-timescale.sql:

-- Подключение к нашей целевой базе
c sensor_data;

-- Включаем расширение TimescaleDB
CREATE EXTENSION IF NOT EXISTS timescaledb CASCADE;

-- Создаем отдельную схему для сенсоров
CREATE SCHEMA IF NOT EXISTS sensors;

-- Устанавливаем права
GRANT ALL PRIVILEGES ON SCHEMA sensors TO postgres;

-- Создаем базовую таблицу для данных, которую мы потом сделаем гипертаблицей
CREATE TABLE IF NOT EXISTS sensors.mqtt_consumer (
    time TIMESTAMPTZ NOT NULL,
	host TEXT,
    topic TEXT NOT NULL,
    value DOUBLE PRECISION NOT NULL
);

-- Превращаем таблицу в гипертаблицу TimescaleDB по полю time
SELECT create_hypertable('sensors.mqtt_consumer', 'time', if_not_exists => TRUE);

Рядом создаем файл переменных окружения .env:

# Настройки БД
DB_USER=postgres
DB_PASSWORD=super_password_123  # Обязательно поменяйте на свой в продакшене!
DB_NAME=sensor_data
DB_PORT=5433
PGADMIN_PASSWORD=super_password_pgadmin

Запускаем нашу базу:

docker-compose up -d

Проверяем, что всё поднялось и работает:

docker compose ps
docker exec -it timescaledb psql -U postgres -d sensor_data -c "SELECT extversion FROM pg_extension WHERE extname = 'timescaledb';"

Шаг 2. Настраиваем транспорт: брокер Mosquitto с авторизацией

Датчики будут слать данные по легковесному протоколу MQTT. Нам нужен брокер. Использовать публичные брокеры без паролей на производстве — это сразу приговор безопасности, поэтому мы настроим Mosquitto с жестким разделением прав (ACL) и авторизацией.

Создаем структуру папок на хосте:

mkdir ~/mosquitto && cd ~/mosquitto
mkdir config data

Пишем конфигурационный файл ./config/mosquitto.conf:

# Запрещаем анонимный доступ
allow_anonymous false

# Указываем пути к файлам авторизации внутри контейнера
password_file /mosquitto/config/passwd
acl_file /mosquitto/config/acl

# Стандартный MQTT порт
listener 1883 0.0.0.0

# Порт для веб-сокетов (если захотите выводить данные на веб-панель)
listener 9001
protocol websockets

# Включаем персистентность (сохранение сессий при перезапуске)
persistence true
persistence_location /mosquitto/data/

Теперь настроим права доступа в файле ./config/acl:

user admin
topic readwrite sensors/#

Важный нюанс по безопасности: Файл паролей нельзя создавать руками в блокноте — пароли должны быть зашифрованы. Мы сгенерируем файл с помощью утилиты mosquitto_passwd прямо через временный контейнер:

# Запускаем временный контейнер для генерации пароля (пользователь admin, пароль secret)
docker run --rm -it -v $(pwd)/config:/mosquitto/config eclipse-mosquitto mosquitto_passwd -c -b /mosquitto/config/passwd admin secret

Совет: Если нужно добавить еще одного пользователя без перезаписи первого, выполните ту же команду, но уберите флаг -c.

Решаем частую проблему с правами в Ubuntu: Mosquitto внутри контейнера работает от пользователя с UID 1883. Если права на файлы на хосте выставлены неверно, брокер выдаст ошибку Unable to open passwordfile и упадет. Лечим это на хосте:

sudo chown -R 1883:1883 ./config
sudo chmod 600 ./config/passwd
sudo chmod 700 ./config/acl
sudo chmod 755 ./config

Теперь создаем простой docker-compose.yml для Mosquitto в папке ~/mosquitto:

version: '3.8'

services:
  mosquitto:
    image: eclipse-mosquitto:latest
    container_name: mosquitto
    restart: unless-stopped
    ports:
      - "1883:1883"
      - "9001:9001"
    volumes:
      - ./config:/mosquitto/config:ro  # Монтируем конфигурацию только для чтения
      - ./data:/mosquitto/data

Запускаем брокер:

docker compose up -d

Быстрый тест связи:

Установим утилиты на хост и проверим, что брокер не пускает анонимов, но отлично работает под нашей учеткой:

sudo apt-get update && sudo apt-get install -y mosquitto-clients

# В первом терминале запускаем подписку:
mosquitto_sub -h localhost -t "sensors/#" -u "admin" -P "secret" -v

# Во втором терминале отправляем тестовое значение:
mosquitto_pub -h localhost -t "sensors/esp8266/temperature" -m "24.5" -u "admin" -P "secret"

Если в первом терминале появилось сообщение — транспорт готов!

Шаг 3. Строим мост: Установка и настройка Telegraf

Нам нужен надёжный инструмент, который забирает данные из MQTT и складывает в TimescaleDB. Писать собственный скрипт на Python для этого — классический велосипед, который развалится при первом же сбое сети. Поэтому берём промышленное решение от InfluxData, Telegraf. Он буферизирует данные в памяти, если база временно недоступна, и почти не грузит систему.

Установим Telegraf на наш Ubuntu-сервер по официальной инструкции:

# Очищаем старые ключи, если они были
sudo rm -f /etc/apt/sources.list.d/influxdata.list
sudo rm -f /etc/apt/trusted.gpg.d/influxdata-archive*

# Добавляем официальный репозиторий
wget -q https://repos.influxdata.com/influxdata-archive.key
cat influxdata-archive.key | gpg --dearmor | sudo tee /etc/apt/trusted.gpg.d/influxdata-archive.gpg > /dev/null
echo 'deb [signed-by=/etc/apt/trusted.gpg.d/influxdata-archive.gpg] https://repos.influxdata.com/debian stable main' | sudo tee /etc/apt/sources.list.d/influxdata.list

# Устанавливаем
sudo apt-get update && sudo apt-get install telegraf -y

Теперь настроем конфигурационный файл /etc/telegraf/telegraf.conf. Нам нужно указать ему слушать топики в MQTT и записывать их в схему sensors нашей базы данных:

[agent]
  interval = "10s"
  round_interval = true
  flush_interval = "10s"

[[inputs.mqtt_consumer]]
  servers = ["tcp://127.0.0.1:1883"]
  topics = ["sensors/esp8266/temperature"]
  data_format = "value"
  data_type = "float"
  username = "admin"
  password = "secret"
  client_id = "telegraf_t1"
  persistent_session = false
[[inputs.mqtt_consumer]]
  servers = ["tcp://127.0.0.1:1883"]
  topics = ["sensors/esp8266/humidity"]
  data_format = "value"
  data_type = "float"
  username = "admin"
  password = "secret"
  client_id = "telegraf_t2"
  persistent_session = false

[[outputs.postgresql]]
  connection = "host=localhost port=5433 user=postgres password=super_password_123 dbname=sensor_data sslmode=disable"

Проверим конфигурацию на ошибки синтаксиса:

sudo telegraf --config /etc/telegraf/telegraf.conf --test

Если ошибок нет, перезапускаем службу:

sudo systemctl restart telegraf

Шаг 4. Пишем «мозг»: MCP-сервер на Go

Вот мы и подошли к самому интересному. Нам нужно написать легковесный сервер, который будет общаться с LLM по протоколу MCP.

Почему Go? Он компилируется в один компактный бинарник без внешних зависимостей, работает очень быстро и почти не ест память. Можно сказать, что это идеальный вариант для промышленного шлюза рядом с базой.

Создаем проект:

mkdir mcp-timescale-server && cd mcp-timescale-server
go mod init mcp-timescale-server
go get github.com/modelcontextprotocol/go-sdk
go get github.com/jackc/pgx/v5

Создаем файл main.go. Наш MCP-сервер будет предоставлять один инструмент — query_sensor_database.

Важный момент безопасности: Чтобы ИИ случайно (или умышленно) не удалил базу, мы жестко ограничиваем логику работы на уровне кода: сервер принимает только SELECT-запросы. Для продакшена мы также настоятельно рекомендуем создать в PostgreSQL пользователя с правами исключительно SELECT (read-only) на таблицу sensors.mqtt_consumer.

package main

import (
    "context"
    "fmt"
    "log"
    "os"
    "strings"
    
    "github.com/jackc/pgx/v5"
    "github.com/modelcontextprotocol/go-sdk/mcp"
)

// Структура входных данных, которую сформирует LLM
type QueryInput struct {
    SqlQuery string `json:"sql_query" jsonschema:"Полный SQL запрос SELECT для чтения данных из TimescaleDB. Запрос должен быть безопасным и использовать только SELECT."`
}

// Структура ответа для LLM
type QueryOutput struct {
    Result string `json:"result" jsonschema:"Результат выполнения запроса"`
}

// Обработчик вызова инструмента ИИ
func ExecuteQuery(ctx context.Context, request *mcp.CallToolRequest, input QueryInput) (*mcp.CallToolResult, QueryOutput, error) {
    // Безопасность: грубая, но полезная валидация на уровне кода
    cleanQuery := strings.TrimSpace(strings.ToLower(input.SqlQuery))
    if !strings.HasPrefix(cleanQuery, "select") {
        return nil, QueryOutput{}, fmt.Errorf("разрешены только запросы на чтение данных (SELECT)")
    }

    connStr := os.Getenv("TIMESCALE_URL")
    if connStr == "" {
        connStr = "postgres://postgres:super_password_123@localhost:5433/sensor_data?sslmode=disable"
    }
    
    conn, err := pgx.Connect(ctx, connStr)
    if err != nil {
        return nil, QueryOutput{}, fmt.Errorf("ошибка подключения к БД: %w", err)
    }
    defer conn.Close(ctx)

    rows, err := conn.Query(ctx, input.SqlQuery)
    if err != nil {
        return nil, QueryOutput{}, fmt.Errorf("ошибка выполнения SQL: %w", err)
    }
    defer rows.Close()

    var resultText string
    for rows.Next() {
        values, err := rows.Values()
        if err != nil {
            continue
        }
        resultText += fmt.Sprintf("%vn", values)
    }
    
    if resultText == "" {
        resultText = "Запрос выполнен успешно, строк не возвращено."
    }

    return nil, QueryOutput{Result: resultText}, nil
}

func main() {
    // Инициализируем MCP сервер
    server := mcp.NewServer(&mcp.Implementation{
        Name:    "timescale-mcp-brain",
        Version: "v1.0.0",
    }, nil)

    // Регистрируем инструмент, подробно описывая его назначение для LLM
    mcp.AddTool(server, &mcp.Tool{
        Name:        "query_sensor_database",
        Description: "Выполняет SQL SELECT запросы к базе данных TimescaleDB с данными датчиков температуры и влажности. Используй для аналитики, агрегации или получения последних значений. Таблица: sensors.mqtt_consumer (поля: time, host, topic, value).",
    }, ExecuteQuery)

    log.Println("MCP Brain server запущен на STDIO транспорте")
    
    // Запускаем сервер через стандартные потоки ввода-вывода (Стандарт связи для MCP)
    if err := server.Run(context.Background(), &mcp.StdioTransport{}); err != nil {
        log.Fatal(err)
    }
}

Собираем наш проект в бинарный файл:

go build -o mcp-brain main.go

Шаг 5. Подключаем ИИ (Настройка клиента)

Теперь нам нужно подружить наш скомпилированный сервер с ИИ-клиентом. К примеру, если вы используете OpenCode, откройте его файл конфигурации настроек MCP (обычно лежит в ~/.config/opencode/opencode.json на Linux/macOS или C:Users%USER%.configopencodeopencode.jsonc на Windows) и добавьте туда наш сервер:

{
  "mcpServers": {
    "timescale-brain": {
      "type": "local",
      "command": "/home/user/mcp-timescale-server/mcp-brain",
      "enabled": true,
      "timeout": 30000,
      "env": {
        "TIMESCALE_URL": "postgres://postgres:super_password_123@localhost:5433/sensor_data?sslmode=disable"
      }
    }
  }
}

(Не забудьте заменить путь /home/user/... на ваш реальный путь к бинарнику).

Перезапустите OpenCode. В списке MCP-серверов должен появиться timescale-brain с зеленой точкой. Это означает, что инструмент готов к работе. OpenCode можно запускать на одной или нескольких рабочих станциях, не обязательно на сервере.

Заключение

Мы создали полноценный, масштабируемый и безопасный мост между сырыми промышленными данными и аналитикой современных LLM. Model Context Protocol открывает огромные возможности. Теперь инженеру не нужно быть экспертом в SQL и Python для глубокого анализа телеметрии. Достаточно уметь формулировать мысли.

Конечно, это лишь базовая архитектура. В реальных проектах мы добавляем к ней интеграцию со SCADA-системами, промышленные шлюзы на Modbus/OPC UA, политики безопасности и тонкую настройку ИИ-агентов под внутренние регламенты предприятий.

Если вам интересна тема интеграции искусственного интеллекта в реальный сектор экономики, автоматизации производств или умных систем мониторинга — мы, команда AWWANTIL, готовы помочь в реализации проектов любой сложности и предоставим существенную скидку на интересный проект.

Пишите нам в MAX, в Telegram или на почту: info [собака] awwantil.ru — обсудим ваши задачи и найдем оптимальное архитектурное решение.

Автор: vpomo

Источник

Оставить комментарий