作为 Nevados 软件团队的一部分,我们正在为 Nevados All Terrain Tracker® 构建一个操作和监控平台。太阳能跟踪器是一种将太阳能电池板朝向太阳的装置。每个太阳能跟踪器都会不断向我们的平台发送状态信息和读数,例如当前角度、温度、电压等,我们需要存储这些信息以进行分析和可视化。如果跟踪器配置为每 5 秒发送一次数据,则每个跟踪器每天有 17,280 个数据点,每个跟踪器每月有 518,400 个数据点。这总结了很多信息。这种数据被称为“时间序列数据”,对于软件中的所有复杂问题,都有多种解决方案(时间序列数据库)。最著名的是 InfluxDB 和 TimescaleDB。对于我们的平台,我们决定使用TDEngine ,这是一种相对较新的产品,针对 IoT 应用程序进行了优化,并使用 SQL 查询语言。
对于这个决定有几个论据: TDEngine
在本文中,我们将介绍 TDEngine 数据库和表的设置,以及如何创建 GraphQL 架构,使我们能够查询来自各种客户端和应用程序的数据。
开始使用 TDEngine 的最简单方法是使用他们的云服务。转到TDEngine并创建一个帐户。他们有一些我们可以使用的公共数据库,这对于制作演示或查询实验来说非常有用。
如果你想在本地运行 TDEngine,你可以使用 Docker 镜像和Telegraf从各种来源检索数据并将其发送到数据库,例如系统信息、ping 统计信息等。
version: '3.9' services: tdengine: restart: always image: tdengine/tdengine:latest hostname: tdengine container_name: tdengine ports: - 6030:6030 - 6041:6041 - 6043-6049:6043-6049 - 6043-6049:6043-6049/udp volumes: - data:/var/lib/taos telegraf: image: telegraf:latest links: - tdengine env_file: .env volumes: - ./telegraf.conf:/etc/telegraf/telegraf.conf
查看Telegraf 配置的官方文档和Telegraf 上的 TDEngine 文档。简而言之,连接到 MQTT 主题看起来像这样:
[agent] interval = "5s" round_interval = true omit_hostname = true [[processors.printer]] [[outputs.http]] url = "http://127.0.0.1:6041/influxdb/v1/write?db=telegraf" method = "POST" timeout = "5s" username = "root" password = "taosdata" data_format = "influx" [[inputs.mqtt_consumer]] topics = [ "devices/+/trackers", ]
在本文中,我们将使用公共数据库,其中包含来自美国 5 个主要港口的船舶移动情况,而不是在本地设置所有内容并等待数据库填充信息。
默认情况下,TDEngine 中的表具有隐式架构,这意味着该架构会适应写入数据库的数据。这对于引导非常有用,但最终,我们希望切换到显式模式以避免传入数据出现问题。需要一点时间来适应的一件事是他们的超级表(简称“STable”)的概念。 TDEngine 中有标签(键)和列(数据)。对于每个组合键,都会创建一个“表”。所有表都分组在 STable 中。
查看vessel
数据库,他们有一个名为ais_data
的稳定表,其中包含很多表。通常,我们不想针对每个表进行查询,而是始终使用 STable 从所有表中获取累积的数据。
TDEngine 有一个函数DESCRIBE
,它允许我们检查表或 STable 的模式。 ais_data
具有以下架构:
STable 有两个键和六个数据列。键是mmsi
和name
。我们可以使用常规的SQL语句来查询数据:
SELECT ts, name, latitude, longitude FROM vessel.ais_data LIMIT 100; ts name latitude longitude 2023-08-11T22:07:02.419Z GERONIMO 37.921673 -122.40928 2023-08-11T22:21:48.985Z GERONIMO 37.921688 -122.40926 2023-08-11T22:25:08.784Z GERONIMO 37.92169 -122.40926 ...
请记住,时间序列数据通常非常大,因此我们应该始终限制结果集。我们可以使用一些特定于时间序列的函数,例如PARTITION BY
,它按键对结果进行分组,对于获取最新更新的单个键很有用。例如:
SELECT last_row(ts, name, latitude, longitude) FROM vessel.ais_data PARTITION BY name; ts name latitude longitude 2023-09-08T13:09:34.951Z SAN SABA 29.375961 -94.86894 2023-09-07T18:05:01.230Z SELENA 33.678585 -118.1954 2023-09-01T17:23:24.145Z SOME TUESDAY 33.676563 -118.230606 ...
我建议阅读他们的SQL 文档以获取更多示例。在继续之前,请转到“编程”、“Node.js”并检索TDENGINE_CLOUD_URL
和TDENGINE_CLOUD_TOKEN
变量。
GraphQL 如今非常出名,并且有很多关于它的好文章。我们选择该技术是因为我们收集和处理来自不同来源的信息,而 GraphQL 允许我们透明地将它们组合到单个 API 中。
我们将使用令人惊叹的Fastify框架(目前是 Node.js 应用程序的默认选择)和Mercurius适配器。 Mercurius 和 Fastify 团队共同努力提供无缝体验,这是注重性能的 GraphQL API 的绝佳选择。 GraphQL Nexus是一个构建/生成模式和解析器的工具,因此我们不必手动编写所有内容。
有一些设置代码等需要完成,我将在此处跳过。您可以在GitHub 上找到完整的示例 - tdengine-graphql-example 。
我想在这篇文章中详细阐述两件相当具体的事情:
TDEngine 有一个Node.js 库,允许我们查询数据库。这使得连接和发送查询变得很容易,不幸的是,响应有点难以处理。所以我们写了一个小包装:
'use strict' import tdengine from '@tdengine/rest' import { tdEngineToken, tdEngineUrl } from '../config.js' import parseFields from 'graphql-parse-fields' const { options: tdOptions, connect: tdConnect } = tdengine tdOptions.query = { token: tdEngineToken } tdOptions.url = tdEngineUrl export default function TdEngine(log) { this.log = log const conn = tdConnect(tdOptions) this.cursor = conn.cursor() } TdEngine.prototype.fetchData = async function fetchData(sql) { this.log.debug('fetchData()') this.log.debug(sql) const result = await this.cursor.query(sql) const data = result.getData() const errorCode = result.getErrCode() const columns = result.getMeta() if (errorCode !== 0) { this.log.error(`fetchData() error: ${result.getErrStr()}`) throw new Error(result.getErrStr()) } return data.map((r) => { const res = {} r.forEach((c, idx) => { const columnName = columns[idx].columnName .replace(/`/g, '') .replace('last_row(', '') .replace(')', '') if (c !== null) { res[columnName] = c } }) return res }) }
这将返回一个可以传递到 GraphQL 上下文中的 TDEngine 对象。我们将主要使用fetchData
函数,在其中我们可以传入 SQL 查询并将结果作为对象数组返回。 TDEngine 分别返回元数据(列)、错误和数据。我们将使用元数据将列映射到常规对象列表中。这里的一个特殊情况是last_row
函数。这些列以last_row(ts)
、 last_row(name)
等形式返回,我们希望删除last_row
部分,以便属性将1:1 映射到GraphQL 模式。这是在columnName.replace
部分完成的。
不幸的是,TDEngine 没有像Postgraphile这样的模式生成器,我们不想编写和维护纯 GraphQL 模式,因此我们将使用 Nexus.js 来帮助我们实现这一点。我们将从两种基本类型开始: VesselMovement
和Timestamp
(标量类型)。 Timestamp
和TDDate
是将日期显示为时间戳或日期字符串的两种不同类型。这对于客户端应用程序(以及开发过程中)很有用,因为它可以决定使用哪种格式。 asNexusMethod
允许我们将该类型用作VesselMovement
模式中的函数。我们可以在类型定义中解析TDDate
以使用原始ts
时间戳值。
import { scalarType, objectType } from 'nexus' export const Timestamp = scalarType({ name: 'Timestamp', asNexusMethod: 'ts', description: 'TDEngine Timestamp', serialize(value) { return new Date(value).getTime() } }) export const TDDate = scalarType({ name: 'TDDate', asNexusMethod: 'tdDate', description: 'TDEngine Timestamp as Date', serialize(value) { return new Date(value).toJSON() } }) export const VesselMovement = objectType({ name: 'VesselMovement', definition(t) { t.ts('ts') t.tdDate('date', { resolve: (root) => root.ts }) t.string('mmsi') t.string('name') t.float('latitude') t.float('longitude') t.float('speed') t.float('heading') t.int('nav_status') } })
对于时间序列类型,我们使用Movement
或Series
后缀在界面中明确区分关系类型和时间序列类型。
现在我们可以定义查询。我们将从一个简单的查询开始,以获取 TDEngine 的最新动态:
import { objectType } from 'nexus' export const GenericQueries = objectType({ name: 'Query', definition(t) { t.list.field('latestMovements', { type: 'VesselMovement', resolve: async (root, args, { tdEngine }, info) => { const fields = filterFields(info) return tdEngine.fetchData( `select last_row(${fields}) from vessel.ais_data partition by mmsi;` ) } }) } })
GraphiQL是测试 API 和探索模式的绝佳工具,您可以通过在 Mercurius 中传递graphiql.enabled = true
来启用它。通过查询,我们可以看到按mmsi
分组的船舶的最新动态。不过,让我们更进一步。 GraphQL 的最大优点之一是它对客户端或应用程序来说是透明的层。我们可以从多个来源获取数据并将它们组合到同一个模式中。
不幸的是,我无法找到包含大量船舶信息的简单/免费 API。有Sinay ,但他们只在 Vessel 响应中提供name
、 mmsi
和imo
(我们已经在 TDEngine 中提供了)。为了举例,我们假设我们的数据库中没有该name
,我们需要从 Sinay 检索它。通过imo
我们还可以查询船舶的二氧化碳排放量,或者可以使用另一个 API 来检索图像、旗帜或其他信息,所有这些都可以在Vessel
类型中组合。
export const Vessel = objectType({ name: 'Vessel', definition(t) { t.string('mmsi') t.string('name') t.nullable.string('imo') t.list.field('movements', { type: 'VesselMovement' }) } })
正如您在此处所看到的,我们可以将列表字段movements
与来自 TDEngine 的时间序列数据一起包含在内。我们将添加另一个查询来获取船舶信息,解析器允许我们合并来自 TDEngine 和 Sinay 的数据:
t.field('vessel', { type: 'Vessel', args: { mmsi: 'String' }, resolve: async (root, args, { tdEngine }, info) => { const waiting = [ getVesselInformation(args.mmsi), tdEngine.fetchData( `select * from vessel.ais_data where mmsi = '${args.mmsi}' order by ts desc limit 10;` ) ] const results = await Promise.all(waiting) return { ...results[0][0], movements: results[1] } } })
🎉 这里我们有一个有效的 GraphQL API,它从 TDEngine 返回我们请求的特定容器的行。 getVesselInformation()
是一个从 Sinay 获取数据的简单包装器。我们将 TDEngine 结果添加到movements
属性中,GraphQL 将处理其余部分并将所有内容映射到模式。
与任何 SQL 数据库一样,我们需要小心用户输入。在上面的示例中,我们直接使用mmsi
输入,这使得该查询容易受到 SQL 注入攻击。为了举例,我们暂时忽略这一点,但在“现实世界”应用程序中,我们应该始终清理用户输入。有几个小型库可以清理字符串,在大多数情况下,我们只依赖数字(分页、限制等)和枚举(排序顺序),GraphQL 会为我们检查这些。
感谢 Dmitry Zaets 指出了这一点!
有一些事情超出了本文的范围,但我想简单提一下:
当我们开始该项目时,Nexus.js 是生成 GraphQL 模式的最佳选择。虽然稳定并且功能有些完整,但它缺乏维护和更新。有一个名为Pothos的基于插件的 GraphQL 模式构建器,它更加现代并且得到积极维护。如果您要开始一个新项目,我可能建议使用 Pothos 而不是 Nexus.js。
感谢莫萨特勒指出这一点!
正如您在上面的Vessel
解析器中看到的,两个数据源都会立即获取并处理。这意味着如果查询仅针对name
,我们仍然会获取响应的movements
。如果查询仅针对movements
,我们仍然从 Sinay 获取名称并可能为请求付费。
这是 GraphQL 的反模式,我们可以通过使用字段信息仅获取请求的数据来提高性能。解析器将字段信息作为第四个参数,但它们很难使用。相反,我们可以使用graphql-parse-fields
来获取请求字段的简单对象并调整解析器逻辑。
在我们的示例查询中,我们使用select *
从数据库中获取所有列,即使不需要它们。这显然很糟糕,我们可以使用相同的字段解析器来优化 sql 查询:
export function filterFields(info, context) { const invalidFields = ['__typename', 'date'] const parsedFields = parseFields(info) const fields = context ? parsedFields[context] : parsedFields const filteredFields = Object.keys(fields).filter( (f) => !invalidFields.includes(f) ) return filteredFields.join(',') }
该函数从 GraphQL 信息中返回一个以逗号分隔的字段列表。
const fields = filterFields(info) return tdEngine.fetchData( `select last_row(${fields}) from vessel.ais_data partition by mmsi;` )
如果我们请求ts
、 latitude
和longitude
,查询将如下所示:
select last_row(ts, latitude, longitude) from vessel.ais_data partition by mmsi;
如果该表中只有几列,这可能并不重要,但如果有更多的表和复杂的查询,这可能会对应用程序性能产生巨大的影响。
TDEngine 有一些特定于时间序列的扩展,可用于提高性能。例如,要检索最新条目,可以使用传统的 SQL 查询:
SELECT ts, name, latitude, longitude FROM vessel.ais_data order by ts desc limit 1;
执行需要 653 毫秒,而“TDEngine”查询只需要 145 毫秒:
SELECT last_row(ts, name, latitude, longitude) FROM vessel.ais_data;
每个表都有配置选项来优化last_row/first_row函数和其他缓存设置。我建议阅读TDEngine 文档。
简单版本:在本文中,我们设置了 TDEngine 时间序列数据库并定义了 GraphQL 架构以允许客户端应用程序连接和查询数据。
还有很多事情要做。我们有一个样板项目,可以在透明的界面中将复杂的时间序列数据与关系数据结合起来。在 Nevados,我们使用 PostgreSQL 作为主数据库,并以与上面的movement
示例相同的方式检索时间序列数据。这是将多个来源的数据组合到单个 API 中的好方法。另一个好处是仅在请求时才获取数据,这为客户端应用程序增加了很多灵活性。最后但并非最不重要的一点是,GraphQL 架构充当文档和合约,因此我们可以轻松勾选“API 文档”框。
如果您有任何问题或意见,请联系 BlueSky或加入 GitHub 上的讨论。
也发布在这里。