В составе команды разработчиков программного обеспечения в Невадосе мы создаем платформу управления и мониторинга для Nevados All Terrain Tracker®. Солнечный трекер — это устройство, которое ориентирует солнечную панель на солнце. Каждый солнечный трекер постоянно отправляет на нашу платформу информацию о состоянии и показания, такие как текущий угол, температура, напряжение и т. д., и нам необходимо хранить эту информацию для анализа и визуализации. Если трекер настроен на отправку данных каждые 5 секунд, у нас есть 17 280 точек данных на каждый трекер в день, 518 400 точек данных на каждый трекер в месяц. Это суммирует много информации. Этот вид данных называется «данными временных рядов», и что касается всех сложных проблем в программном обеспечении, для них существует несколько решений (базы данных временных рядов). Самые известные из них — InfluxDB и TimescaleDB. Для нашей платформы мы решили работать с TDEngine — относительно новым продуктом, оптимизированным для IoT-приложений и работающим с языком запросов SQL.
Аргументов в пользу такого решения было несколько: TDEngine
В этой статье мы рассмотрим настройку базы данных и таблиц TDEngine, а также то, как создать схему GraphQL, которая позволит нам запрашивать данные от различных клиентов и приложений.
Самый простой способ начать работу с TDEngine — использовать их облачный сервис. Зайдите в TDEngine и создайте учетную запись. У них есть несколько общедоступных баз данных, которые мы можем использовать, что отлично подходит для создания демо или экспериментов с запросами.
Если вы хотите запустить TDEngine локально, вы можете использовать образ Docker и Telegraf для получения данных из различных источников и отправки их в базу данных, таких как системная информация, статистика пинга и т. д.
version: '3.9' services: tdengine: restart: always image: tdengine/tdengine:latest hostname: tdengine container_name: tdengine ports: - 6030:6030 - 6041:6041 - 6043-6049:6043-6049 - 6043-6049:6043-6049/udp volumes: - data:/var/lib/taos telegraf: image: telegraf:latest links: - tdengine env_file: .env volumes: - ./telegraf.conf:/etc/telegraf/telegraf.conf
Ознакомьтесь с официальной документацией по конфигурации Telegraf и документацией TDEngine по Telegraf . Короче говоря, применительно к теме MQTT это будет выглядеть примерно так:
[agent] interval = "5s" round_interval = true omit_hostname = true [[processors.printer]] [[outputs.http]] url = "http://127.0.0.1:6041/influxdb/v1/write?db=telegraf" method = "POST" timeout = "5s" username = "root" password = "taosdata" data_format = "influx" [[inputs.mqtt_consumer]] topics = [ "devices/+/trackers", ]
Вместо того, чтобы настраивать все локально и ждать, пока база данных заполнится информацией, для этой статьи мы будем использовать общедоступную базу данных, которая содержит данные о движении судов из 5 основных портов США.
По умолчанию таблицы в TDEngine имеют неявную схему, то есть схема адаптируется к данным, записываемым в базу данных. Это отлично подходит для начальной загрузки, но в конечном итоге мы хотим переключиться на явную схему, чтобы избежать проблем с входящими данными. Одна вещь, к которой нужно немного времени, чтобы привыкнуть, — это концепция супертаблиц («STable» для краткости). В TDEngine есть теги (ключи) и столбцы (данные). Для каждой комбинации клавиш создается «таблица». Все таблицы сгруппированы в STable.
Глядя на базу данных vessel
, мы видим одну STable под названием ais_data
, которая содержит множество таблиц. Обычно мы не хотим выполнять запросы по каждой таблице, а всегда используем STable для получения накопленных данных из всех таблиц.
В TDEngine есть функция DESCRIBE
, которая позволяет нам проверять схему таблицы или STable. ais_data
имеет следующую схему:
STable имеет два ключа и шесть столбцов данных. Ключами являются mmsi
и name
. Мы можем использовать обычные операторы SQL для запроса данных:
SELECT ts, name, latitude, longitude FROM vessel.ais_data LIMIT 100; ts name latitude longitude 2023-08-11T22:07:02.419Z GERONIMO 37.921673 -122.40928 2023-08-11T22:21:48.985Z GERONIMO 37.921688 -122.40926 2023-08-11T22:25:08.784Z GERONIMO 37.92169 -122.40926 ...
Имейте в виду, что данные временных рядов обычно очень велики, поэтому мы всегда должны ограничивать набор результатов. Мы можем использовать несколько функций, специфичных для временных рядов, например PARTITION BY
, которая группирует результаты по ключу и полезна для получения последних обновлений отдельных ключей. Например:
SELECT last_row(ts, name, latitude, longitude) FROM vessel.ais_data PARTITION BY name; ts name latitude longitude 2023-09-08T13:09:34.951Z SAN SABA 29.375961 -94.86894 2023-09-07T18:05:01.230Z SELENA 33.678585 -118.1954 2023-09-01T17:23:24.145Z SOME TUESDAY 33.676563 -118.230606 ...
Я рекомендую прочитать их документацию по SQL для получения дополнительных примеров. Прежде чем двигаться дальше, перейдите в «Программирование», «Node.js» и получите переменные TDENGINE_CLOUD_URL
и TDENGINE_CLOUD_TOKEN
.
GraphQL в наши дни довольно хорошо известен, и о нем написано много хороших статей. Мы выбрали технологию, так как собираем и обрабатываем информацию из разных источников, а GraphQL позволяет нам прозрачно объединять их в единый API.
Мы будем использовать замечательную платформу Fastify (сейчас это выбор по умолчанию для приложений Node.js) и адаптер Mercurius . Команды Mercurius и Fastify работали вместе, чтобы обеспечить бесперебойную работу, и это отличный выбор API-интерфейсов GraphQL, ориентированных на производительность. GraphQL Nexus — это инструмент для построения/генерации схемы и преобразователей, поэтому нам не нужно писать все вручную.
Нужно написать немного кода настройки и т. д., который я здесь пропущу. Полный пример вы можете найти на GitHub — tdengine-graphql-example .
В этой статье я хочу остановиться на двух довольно конкретных вещах:
TDEngine имеет библиотеку Node.js , которая позволяет нам делать запросы к базе данных. Это упрощает подключение и отправку запросов, но, к сожалению, с ответами немного сложно работать. Итак, мы написали небольшую обертку:
'use strict' import tdengine from '@tdengine/rest' import { tdEngineToken, tdEngineUrl } from '../config.js' import parseFields from 'graphql-parse-fields' const { options: tdOptions, connect: tdConnect } = tdengine tdOptions.query = { token: tdEngineToken } tdOptions.url = tdEngineUrl export default function TdEngine(log) { this.log = log const conn = tdConnect(tdOptions) this.cursor = conn.cursor() } TdEngine.prototype.fetchData = async function fetchData(sql) { this.log.debug('fetchData()') this.log.debug(sql) const result = await this.cursor.query(sql) const data = result.getData() const errorCode = result.getErrCode() const columns = result.getMeta() if (errorCode !== 0) { this.log.error(`fetchData() error: ${result.getErrStr()}`) throw new Error(result.getErrStr()) } return data.map((r) => { const res = {} r.forEach((c, idx) => { const columnName = columns[idx].columnName .replace(/`/g, '') .replace('last_row(', '') .replace(')', '') if (c !== null) { res[columnName] = c } }) return res }) }
Это возвращает объект TDEngine, который можно передать в контекст GraphQL. В первую очередь мы будем использовать функцию fetchData
, с помощью которой мы сможем передать SQL-запрос и получить результаты обратно в виде массива объектов. TDEngine возвращает метаданные (столбцы), ошибки и данные отдельно. Мы будем использовать метаданные, чтобы сопоставить столбцы с обычным списком объектов. Особым случаем здесь является функция last_row
. Столбцы возвращаются как last_row(ts)
, last_row(name)
и т. д., и мы хотим удалить часть last_row
, чтобы атрибут сопоставлялся 1:1 со схемой GraphQL. Это делается в части columnName.replace
.
К сожалению, для TDEngine не существует генератора схем, такого как Postgraphile , и мы не хотим писать и поддерживать чистую схему GraphQL, поэтому воспользуемся Nexus.js, чтобы помочь нам в этом. Мы начнем с двух основных типов: VesselMovement
и Timestamp
(скалярный тип). Timestamp
и TDDate
— это два разных типа для отображения даты в виде отметки времени или в виде строки даты. Это полезно для клиентского приложения (и во время разработки), поскольку оно может решить, какой формат использовать. asNexusMethod
позволяет нам использовать тип как функцию в схеме VesselMovement
. Мы можем разрешить TDDate
прямо здесь, в определении типа, чтобы использовать исходное значение метки времени ts
.
import { scalarType, objectType } from 'nexus' export const Timestamp = scalarType({ name: 'Timestamp', asNexusMethod: 'ts', description: 'TDEngine Timestamp', serialize(value) { return new Date(value).getTime() } }) export const TDDate = scalarType({ name: 'TDDate', asNexusMethod: 'tdDate', description: 'TDEngine Timestamp as Date', serialize(value) { return new Date(value).toJSON() } }) export const VesselMovement = objectType({ name: 'VesselMovement', definition(t) { t.ts('ts') t.tdDate('date', { resolve: (root) => root.ts }) t.string('mmsi') t.string('name') t.float('latitude') t.float('longitude') t.float('speed') t.float('heading') t.int('nav_status') } })
Для типов временных рядов мы используем суффикс Movement
или Series
для четкого разделения реляционных типов и типов временных рядов в интерфейсе.
Теперь мы можем определить запрос. Мы начнем с простого запроса, чтобы получить последние изменения от TDEngine:
import { objectType } from 'nexus' export const GenericQueries = objectType({ name: 'Query', definition(t) { t.list.field('latestMovements', { type: 'VesselMovement', resolve: async (root, args, { tdEngine }, info) => { const fields = filterFields(info) return tdEngine.fetchData( `select last_row(${fields}) from vessel.ais_data partition by mmsi;` ) } }) } })
GraphiQL — отличный инструмент для тестирования API и изучения схемы. Вы можете включить его, передав graphiql.enabled = true
в Mercurius. С помощью запроса мы можем увидеть последние перемещения судов, сгруппированные по mmsi
. Однако пойдем немного дальше. Одним из самых больших преимуществ GraphQL является то, что это прозрачный уровень для клиента или приложения. Мы можем получать данные из нескольких источников и объединять их в одну схему.
К сожалению, мне не удалось найти простой/бесплатный API с подробной информацией о судне. Существует Sinay , но в ответе Vessel (который у нас уже есть в TDEngine) они предоставляют только name
, mmsi
и imo
. Для примера мы предполагаем, что в нашей базе данных нет этого name
, и нам нужно получить его из Sinay. С помощью imo
мы также могли бы запросить выбросы CO2 для судна или использовать другой API для получения изображения, флага или другой информации, и все это можно объединить в типе Vessel
.
export const Vessel = objectType({ name: 'Vessel', definition(t) { t.string('mmsi') t.string('name') t.nullable.string('imo') t.list.field('movements', { type: 'VesselMovement' }) } })
Как вы можете видеть здесь, мы можем включить movements
полей списка с данными временных рядов из TDEngine. Мы добавим еще один запрос для получения информации о судне, и преобразователь позволит нам объединить данные из TDEngine и Sinay:
t.field('vessel', { type: 'Vessel', args: { mmsi: 'String' }, resolve: async (root, args, { tdEngine }, info) => { const waiting = [ getVesselInformation(args.mmsi), tdEngine.fetchData( `select * from vessel.ais_data where mmsi = '${args.mmsi}' order by ts desc limit 10;` ) ] const results = await Promise.all(waiting) return { ...results[0][0], movements: results[1] } } })
🎉 и здесь у нас есть работающий GraphQL API, возвращающий строки из TDEngine для конкретного запрошенного нами судна. getVesselInformation()
— это простая оболочка для получения данных из Sinay. Мы добавим результаты TDEngine в атрибут movements
, а GraphQL позаботится обо всем остальном и сопоставит все со схемой.
Как и в случае с любой базой данных SQL, нам нужно быть осторожными с пользовательским вводом. В приведенном выше примере мы напрямую используем входные данные mmsi
, что делает этот запрос уязвимым для SQL-инъекций. Для примера мы пока проигнорируем это, но в «реальных» приложениях мы всегда должны очищать пользовательский ввод. Существует несколько небольших библиотек для очистки строк. В большинстве случаев мы полагаемся только на числа (нумерация страниц, лимит и т. д.) и перечисления (порядок сортировки), которые GraphQL проверяет за нас.
Спасибо Дмитрию Зацу за указание на это!
Есть несколько вещей, которые выходят за рамки этой статьи, но я хочу упомянуть их вкратце:
Когда мы начали проект, Nexus.js был лучшим выбором для создания схемы GraphQL. Несмотря на то, что он стабилен и в некоторой степени полнофункционален , ему не хватает обслуживания и обновлений. Существует построитель схем GraphQL на основе плагинов под названием Pothos , который немного более современный и активно поддерживается. Если вы начинаете новый проект, я, вероятно, рекомендую использовать Pothos вместо Nexus.js.
Спасибо Мо Саттлеру за указание на это!
Как вы можете видеть в преобразователе Vessel
выше, оба источника данных немедленно извлекаются и обрабатываются. Это означает, что если запрос касается только name
, мы все равно получим movements
для ответа. А если запрос касается только movements
, мы все равно получаем имя от Синай и потенциально платим за запрос.
Это антишаблон GraphQL, и мы можем повысить производительность, используя информацию поля для получения только запрошенных данных. Резолверы имеют информацию о поле в качестве четвертого аргумента, но с ними довольно сложно работать. Вместо этого мы можем использовать graphql-parse-fields
, чтобы получить простой объект запрошенных полей и настроить логику преобразователя.
В наших примерах запросов мы используем select *
для извлечения всех столбцов из базы данных, даже если они не нужны. Очевидно, это очень плохо, и мы можем использовать тот же анализатор полей для оптимизации запросов sql:
export function filterFields(info, context) { const invalidFields = ['__typename', 'date'] const parsedFields = parseFields(info) const fields = context ? parsedFields[context] : parsedFields const filteredFields = Object.keys(fields).filter( (f) => !invalidFields.includes(f) ) return filteredFields.join(',') }
Эта функция возвращает список полей, разделенных запятыми, из информации GraphQL.
const fields = filterFields(info) return tdEngine.fetchData( `select last_row(${fields}) from vessel.ais_data partition by mmsi;` )
Если мы запросим ts
, latitude
и longitude
, запрос будет выглядеть так:
select last_row(ts, latitude, longitude) from vessel.ais_data partition by mmsi;
Если в этой таблице всего несколько столбцов, это может не иметь большого значения, но при наличии большего количества таблиц и сложных запросов это может существенно повлиять на производительность приложения.
TDEngine имеет некоторые расширения, специфичные для временных рядов, которые следует использовать для повышения производительности. Например, чтобы получить последнюю запись, можно использовать традиционный SQL-запрос:
SELECT ts, name, latitude, longitude FROM vessel.ais_data order by ts desc limit 1;
Выполнение занимает 653 мс, тогда как запрос «TDEngine» занимает всего 145 мс:
SELECT last_row(ts, name, latitude, longitude) FROM vessel.ais_data;
Для каждой таблицы существуют параметры конфигурации для оптимизации функций Last_row/first_row и других параметров кэша. Рекомендую прочитать документацию TDEngine .
Простая версия: в этой статье мы настроили базу данных временных рядов TDEngine и определили схему GraphQL, позволяющую клиентским приложениям подключаться и запрашивать данные.
Это еще не все. У нас есть шаблонный проект для объединения сложных данных временных рядов с реляционными данными в прозрачном интерфейсе. В Nevados мы используем PostgreSQL в качестве основной базы данных и получаем данные временных рядов так же, как в приведенном выше примере movement
. Это отличный способ объединить данные из нескольких источников в одном API. Еще одним преимуществом является то, что данные извлекаются только по запросу, что добавляет клиентскому приложению большую гибкость. И последнее, но не менее важное: схема GraphQL работает как документация и контракт, поэтому мы можем легко поставить галочку в поле «Документация API».
Если у вас есть вопросы или комментарии , обращайтесь на BlueSky или присоединяйтесь к обсуждению на GitHub .
Также опубликовано здесь .