InfluxDB 2.0 的数据查询语法

更新日期: 2022-08-11 阅读次数: 25682 字数: 904 分类: InfluxDB

目标

  • 查询最近的50条数据。可以分页,符合 Antd Pro Table 的显示规范。
  • 查询指定时间段内的数据。
  • 在时间跨度大的情况下,可以聚合数据,避免取出的数据过多。
  • 指定 field key 查询数据,单 field,多 field
  • golang sdk 相关的查询方法使用

Flux

Flux 是 InfluxDB 2.0 引入的一门查询语言,号称借鉴了 Js 的语法。

相关英文单词

  • influx: n. 流入;汇集;河流的汇集处
  • flux: n. 流量;变迁;不稳定;流出; vt. 使熔融;用焊剂处理; vi. 熔化;流出
  • mean: 平均值
  • aggregate: 聚合
  • downsampling: 降采样

通过实例学习 Flux 查询语法

实例一:

from(bucket:"example-bucket")
  |> range(start: -1h, stop: -10m)
  • 第一行指定数据源,即指定 bucket
  • 第二行限定时间区间。Flux 查询必须限定时间区间。
  • 如果 stop 省略,就默认是当前时间。
  • 竖杠右箭头组合,是 Pipe-forward operator, 类似 linux 管道。即将上一步的查询结果,传递给下一步操作

实例二:

from(bucket:"example-bucket")
  |> range(start: -15m)
  |> filter(fn: (r) =>
    r._measurement == "cpu" and
    r._field == "usage_system" and
    r.cpu == "cpu-total"
  )
  |> yield()
  • filter 对每行 record 进行过滤
  • fn 是 function 的缩写,类似 js 中的匿名函数,这里非常像箭头函数
  • r 是 record 的缩写
  • yield 默认是可以省略的,只有当包含多个查询时,才需要使用 yield。但是我没理解什么是多个查询。。。
  • measurement, field 比较好理解,最后一个过滤条件 cpu 是 tag 过滤

分页

https://docs.influxdata.com/influxdb/v2.0/reference/flux/stdlib/built-in/transformations/limit/#offset

limit(
  n:10,
  offset: 0
)

代码:

from(bucket:"example-bucket")
  |> range(start:-1h)
  |> limit(n:10)

需要注意的是,这里的 limit 数,是指每个 table 返回一条数据,所以如果有 4 个 table,就会返回 4 条。。。

例如,有4个 field key,就是 4 个 Record,分属于 4 个不同的 table。

按时间倒序

from(bucket:"example-bucket")
  |> order(desc: true)
  |> limit(n:10)

https://docs.influxdata.com/flux/v0.x/stdlib/universe/sort/

注意:但是使用 pivot 时,这个倒序的排序很诡异,尽量不要使用

golang sdk

InfluxDB Golang Client 能返回两种格式的查询结果:

  • QueryTableResult
  • Raw string

如何将 QueryTableResult 转成 JSON 输出:

https://stackoverflow.com/questions/65425375/convert-data-obtained-from-influxdb-to-json

// QueryTableResult parses streamed flux query response into structures representing flux table parts
// Walking though the result is done by repeatedly calling Next() until returns false.
// Actual flux table info (columns with names, data types, etc) is returned by TableMetadata() method.
// Data are acquired by Record() method.
// Preliminary end can be caused by an error, so when Next() return false, check Err() for an error
type QueryTableResult struct {
	io.Closer
	csvReader     *csv.Reader
	tablePosition int
	tableChanged  bool
	table         *query.FluxTableMetadata
	record        *query.FluxRecord
	err           error
}

// FluxRecord represents row in the flux query result table
type FluxRecord struct {
	table  int
	values map[string]interface{}
}

// Value returns the default _value column value or nil if not present
func (r *FluxRecord) Value() interface{} {
	return r.ValueByKey("_value")
}

// Values returns map of the values where key is the column name
func (r *FluxRecord) Values() map[string]interface{} {
	return r.values
}

从 QueryTableResult 的结构看,还是定义一个 struct slice 然后循环写入比较靠谱。

打印一行看看数据结构:

	// get QueryTableResult
	result, err := models.QueryAPI.Query(context.Background(), `
		from(bucket:"sunzhongwei")
			|> range(start: -7d) 
			|> filter(fn: (r) => 
				r._measurement == "sunzhongwei"
			)`)
	if err == nil {
		// Iterate over query response
		for result.Next() {
			// Access data
			//fmt.Printf("value: %v\n", result.Record().Value())
			fmt.Printf("value: %v\n", result.Record().Values())
		}
		// check for an error
		if result.Err() != nil {
			fmt.Printf("query parsing error: %s\n", result.Err().Error())
		}
	} else {
		panic(err)
	}

对输出进行了排版,方便查看

value: map[
	_field:temperature 
	_measurement:sunzhongwei 
	_start:2021-06-09 08:42:28.6152305 +0000 UTC 
	_stop:2021-06-16 08:42:28.6152305 +0000 UTC 
	_time:2021-06-15 08:15:04.0473985 +0000 UTC 
	_value:87.32 
	deviceId:justtest 
	result:_result 
	table:4
]

可以看到里面有不少多余的字段,例如 measurement, start, stop; 实际上只需要保留 time, value,field, deviceId (自定义 tag) 就足够在前端展示了。

参考

  • https://docs.influxdata.com/influxdb/v2.0/query-data/get-started/
  • https://github.com/influxdata/influxdb-client-go#queries

关于作者 🌱

我是来自山东烟台的一名开发者,有敢兴趣的话题,或者软件开发需求,欢迎加微信 zhongwei 聊聊, 查看更多联系方式