跳到主要内容

连续查询

连续查询能够自动对数据进行降采样和聚合,生成物化视图,从而在保持可查询历史数据的同时,降低存储需求。

概述

IEDB 中的连续查询可以帮助您:

  • 数据降采样:将高频数据聚合为低频摘要
  • 减少存储空间:存储聚合数据而非原始数据。
  • 保留历史记录:保留长期趋势,但无需完全数颗粒度数据
  • 提升查询性能:查询预聚合数据以加快结果获取速度
  • 创建物化视图:自动维护聚合数据集

工作原理

连续查询将源测量数据聚合到目标测量数据中:

  1. 定义查询:使用 SQL 指定聚合逻辑
  2. 设置计划:配置分组的时间间隔(例如,每小时、每天)
  3. 手动执行:通过 API 和开始/结束时间触发执行
  4. 存储结果:将汇总数据写入新的测量表值
  5. 应用保留策略:可选择为聚合数据设置自定义保留策略

流程图

源测量表 (cpu)

连续查询 (AVG, MAX, MIN, etc.)

目标测量表 (cpu_hourly)

可选: 保留策略

API 接口

创建连续查询

定义一个新的连续查询:

POST /api/v1/continuous_queries

body

{
"name": "cpu_hourly_avg",
"database": "telegraf",
"source_measurement": "cpu",
"destination_measurement": "cpu_hourly",
"query": "SELECT time_bucket('1 hour', time) AS time, host, AVG(usage_idle) AS avg_usage_idle, AVG(usage_user) AS avg_usage_user, COUNT(*) AS sample_count FROM telegraf.cpu GROUP BY time_bucket('1 hour', time), host",
"interval": "1h",
"retention_policy": "90d",
"is_active": true
}

参数

  • name(字符串,必填):唯一查询标识符
  • database(字符串,必填):目标数据库名称
  • source_measurement(字符串,必填):要聚合的源测量表
  • destination_measurement(字符串,必填):目标存储位置
  • query(字符串,必填):SQL 聚合查询
  • interval(字符串,必填):时间段间隔(1m``5m``1h``1d等等)
  • retention_policy(字符串,可选):聚合数据的保留期限(例如90d365d
  • is_active(布尔值,必填):启用/禁用查询

连续查询

检索所有连续查询列表:

GET /api/v1/continuous_queries

响应

[
{
"id": "550e8400-e29b-41d4-a716-446655440000",
"name": "cpu_hourly_avg",
"database": "telegraf",
"source_measurement": "cpu",
"destination_measurement": "cpu_hourly",
"interval": "1h",
"retention_policy": "90d",
"is_active": true,
"created_at": "2024-01-15T10:30:00Z",
"last_executed_at": "2024-01-20T02:00:00Z"
}
]

获取单个查询

检索特定的连续查询:

GET /api/v1/continuous_queries/{query_id}

更新连续查询

更新现有连续查询:

PUT /api/v1/continuous_queries/{query_id}

body:与创建查询相同

删除连续查询

删除连续查询:

DELETE /api/v1/continuous_queries/{query_id}

警告 删除连续查询不会删除目标测量表或其数据。聚合数据仍然可查询。

执行连续查询

手动触发连续查询:

POST /api/v1/continuous_queries/{query_id}/execute

body

{
"start_time": "2024-01-01T00:00:00Z",
"end_time": "2024-01-31T23:59:59Z",
"dry_run": false
}

参数

  • start_time(字符串,必填):起始时间戳(ISO 8601 格式)
  • end_time(字符串,必填):结束时间戳(ISO 8601 格式)
  • dry_run(布尔值,可选):不写入数据进行测试(默认值false

响应

{
"query_id": "550e8400-e29b-41d4-a716-446655440000",
"rows_processed": 1000000,
"rows_written": 720,
"execution_time_ms": 2500,
"time_range": {
"start": "2024-01-01T00:00:00Z",
"end": "2024-01-31T23:59:59Z"
},
"dry_run": false
}

查看执行历史记录

查看连续查询的历史执行记录:

GET /api/v1/continuous_queries/{query_id}/executions?limit=50

响应

[
{
"execution_id": "abc123",
"executed_at": "2024-01-20T02:00:00Z",
"start_time": "2024-01-19T00:00:00Z",
"end_time": "2024-01-20T00:00:00Z",
"rows_processed": 86400,
"rows_written": 24,
"execution_time_ms": 1200,
"status": "success"
}
]

查询语法

连续查询使用 SQL 并进行时间优化。

推荐方法

用于epoch_us()时间戳转换和date_trunc()时间段划分:

SELECT
date_trunc('hour', epoch_us(time)) AS time,
host,
AVG(usage_idle) AS avg_usage_idle,
MAX(usage_user) AS max_usage_user,
MIN(usage_system) AS min_usage_system,
COUNT(*) AS sample_count
FROM telegraf.cpu
GROUP BY date_trunc('hour', epoch_us(time)), host

支持的聚合

  • AVG()平均值
  • SUM()- 数值总和
  • MIN()- 最小值
  • MAX()- 最大值
  • COUNT()行数
  • STDDEV()标准差
  • PERCENTILE_CONT()- 百分位数计算

时间维度聚合

使用**date_trunc()**

-- 小时
date_trunc('hour', epoch_us(time))

-- 天
date_trunc('day', epoch_us(time))

-- 5分钟
date_trunc('hour', epoch_us(time)) + INTERVAL '5 minutes' * floor(extract(minute from epoch_us(time)) / 5)

样本计数

务必COUNT(*)记录每个聚合体所代表的原始样本数量:

SELECT
date_trunc('hour', epoch_us(time)) AS time,
host,
AVG(usage_idle) AS avg_usage_idle,
COUNT(*) AS sample_count -- Important for data quality
FROM telegraf.cpu
GROUP BY date_trunc('hour', epoch_us(time)), host

使用示例

示例 1:每小时 CPU 指标

将每秒 CPU 指标汇总成每小时平均值:

import requests

# 创建连续查询
response = requests.post(
"http://localhost:8000/api/v1/continuous_queries",
headers={"Authorization": "Bearer YOUR_TOKEN"},
json={
"name": "cpu_hourly",
"database": "telegraf",
"source_measurement": "cpu",
"destination_measurement": "cpu_hourly",
"query": """
SELECT
date_trunc('hour', epoch_us(time)) AS time,
host,
AVG(usage_idle) AS avg_usage_idle,
AVG(usage_user) AS avg_usage_user,
AVG(usage_system) AS avg_usage_system,
MAX(usage_user) AS max_usage_user,
COUNT(*) AS sample_count
FROM telegraf.cpu
GROUP BY date_trunc('hour', epoch_us(time)), host
""",
"interval": "1h",
"retention_policy": "365d",
"is_active": True
}
)

query_id = response.json()["id"]

# 统计过去三十天
from datetime import datetime, timedelta

end_time = datetime.utcnow()
start_time = end_time - timedelta(days=30)

result = requests.post(
f"http://localhost:8000/api/v1/continuous_queries/{query_id}/execute",
headers={"Authorization": "Bearer YOUR_TOKEN"},
json={
"start_time": start_time.isoformat() + "Z",
"end_time": end_time.isoformat() + "Z"
}
)

print(f"Processed {result.json()['rows_processed']} rows")
print(f"Generated {result.json()['rows_written']} aggregated rows")

示例 2:每日请求汇总

将 API 请求日志汇总成每日摘要:

response = requests.post(
"http://localhost:8000/api/v1/continuous_queries",
headers={"Authorization": "Bearer YOUR_TOKEN"},
json={
"name": "requests_daily",
"database": "logs",
"source_measurement": "api_requests",
"destination_measurement": "api_requests_daily",
"query": """
SELECT
date_trunc('day', epoch_us(time)) AS time,
endpoint,
status_code,
COUNT(*) AS total_requests,
AVG(response_time_ms) AS avg_response_time,
MAX(response_time_ms) AS max_response_time,
PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY response_time_ms) AS p95_response_time
FROM api_requests
GROUP BY date_trunc('day', epoch_us(time)), endpoint, status_code
""",
"interval": "1d",
"retention_policy": "730d", # 2 years
"is_active": True
}
)

示例 3:5 分钟传感器读数

将物联网传感器数据降采样至 5 分钟间隔:

response = requests.post(
"http://localhost:8000/api/v1/continuous_queries",
headers={"Authorization": "Bearer YOUR_TOKEN"},
json={
"name": "sensors_5min",
"database": "iot",
"source_measurement": "temperature",
"destination_measurement": "temperature_5min",
"query": """
SELECT
date_trunc('hour', epoch_us(time)) +
INTERVAL '5 minutes' * floor(extract(minute from epoch_us(time)) / 5) AS time,
sensor_id,
location,
AVG(temperature) AS avg_temperature,
MIN(temperature) AS min_temperature,
MAX(temperature) AS max_temperature,
COUNT(*) AS sample_count
FROM temperature
GROUP BY
date_trunc('hour', epoch_us(time)) +
INTERVAL '5 minutes' * floor(extract(minute from epoch_us(time)) / 5),
sensor_id,
location
""",
"interval": "5m",
"retention_policy": "90d",
"is_active": True
}
)

示例 4:预运行测试

在执行之前测试连续查询:

# 创建查询
response = requests.post(
"http://localhost:8000/api/v1/continuous_queries",
headers={"Authorization": "Bearer YOUR_TOKEN"},
json={...}
)

query_id = response.json()["id"]

# 试运行测试
dry_run = requests.post(
f"http://localhost:8000/api/v1/continuous_queries/{query_id}/execute",
headers={"Authorization": "Bearer YOUR_TOKEN"},
json={
"start_time": "2024-01-01T00:00:00Z",
"end_time": "2024-01-02T00:00:00Z",
"dry_run": True
}
)

print(f"Would process {dry_run.json()['rows_processed']} rows")
print(f"Would generate {dry_run.json()['rows_written']} aggregated rows")

# 真实执行
if dry_run.json()['rows_written'] > 0:
result = requests.post(
f"http://localhost:8000/api/v1/continuous_queries/{query_id}/execute",
headers={"Authorization": "Bearer YOUR_TOKEN"},
json={
"start_time": "2024-01-01T00:00:00Z",
"end_time": "2024-01-02T00:00:00Z",
"dry_run": False
}
)

存储优势

连续查询可显著降低存储需求:

降采样之前

原始 CPU 指标(1 秒间隔):

  • 1 年 = 每台主机 31,536,000 行
  • 10 台主机 = 315,360,000 行
  • 存储空间:约 20GB

降采样至小时后

小时汇总

  • 1 年 = 每个主机 8,760 行
  • 10 台主机 = 87,600 行
  • 存储空间:约 50MB

缩减:缩小约 400 倍,同时保持每小时趋势可见性。

多层战略

结合不同粒度以实现最佳存储效果:

# 层级1:原始数据保留7天
# 层级2:小时聚合数据保留90天
requests.post("/api/v1/continuous_queries", json={
"name": "cpu_hourly",
"interval": "1h",
"retention_policy": "90d"
})

# 层级3:天聚合数据保留2年
requests.post("/api/v1/continuous_queries", json={
"name": "cpu_daily",
"source_measurement": "cpu_hourly", # 对小时聚合数据进行二次聚合
"destination_measurement": "cpu_daily",
"interval": "1d",
"retention_policy": "730d"
})

# 使用保留策略在7天后删除原始数据
requests.post("/api/v1/retention", json={
"database": "telegraf",
"measurement": "cpu",
"retention_days": 7
})

最佳实践

1. 建议保守

开始时间隔时间要长一些,然后根据实际需要进行调整:

# 从小时粒度开始
{"interval": "1h"}

# 如果粒度太粗,可调整为15分钟
{"interval": "15m"}

2. 初始阶段保留原始数据

在测试聚合数据时保留原始数据:

# 创建连续查询
create_query(...)

# 彻底测试聚合
execute_dry_run(...)
execute_for_real(...)

# 仅在验证后,对原始数据应用保留策略
requests.post("/api/v1/retention", json={
"measurement": "cpu",
"retention_days": 30 # 原始数据保留30天
})

3. 使用预运行测试

在正式执行查询之前,务必先进行预运行测试:

# 先在小时间范围内测试
dry_run(start="2024-01-01", end="2024-01-02")

# 逐步扩大范围
dry_run(start="2024-01-01", end="2024-01-07")

# 最后,全量执行
execute(start="2024-01-01", end="2024-12-31")

4. 样本计数

追踪每个汇总样本中的原始样本数量:

SELECT
date_trunc('hour', epoch_us(time)) AS time,
COUNT(*) AS sample_count, -- Essential for data quality
AVG(value) AS avg_value
FROM measurement
GROUP BY date_trunc('hour', epoch_us(time))

这有助于识别:

  • 数据缺失(样本量过低)
  • 数据质量问题
  • 意想不到的情况

5. 监控执行性能

监控连续查询执行时间:

result = execute_query(...)

print(f"Execution time: {result['execution_time_ms']}ms")
print(f"Throughput: {result['rows_processed'] / (result['execution_time_ms'] / 1000):.0f} rows/sec")

# Alert if execution takes too long
if result['execution_time_ms'] > 60000: # 1 minute
print("Warning: Slow execution!")

6. 使用适当的间隔

将区间与数据特征相匹配:

高频数据(物联网传感器,每 1 秒间隔):

  • 近期分析的 5 分钟汇总数据
  • 中期每小时汇总数据
  • 长期趋势的每日汇总数据

中频数据(每分钟间隔的应用指标):

  • 近期分析的小时汇总数据
  • 长期每日汇总数据

低频数据(每小时一次的业务指标):

  • 每日汇总
  • 用于多年分析的月度汇总数据

故障排除

未写入任何行

问题:执行返回rows_written: 0

解决方案

  • 验证源测量数据是否包含在指定的时间范围内。
  • 请检查查询语法是否正确
  • 确保GROUP BY子句与聚合列匹配
  • 使用试运行来检查查询结果

查询语法错误

问题:执行失败,出现 SQL 错误。

解决方案

  • /query使用端点直接测试查询
  • 验证源测量中是否存在列名
  • 检查 DuckDB 特有的语法要求
  • 用于epoch_us()时间戳转换

执行缓慢

问题:连续查询耗时过长。

解决方案

  • 缩短每次执行的时间范围
  • 确保源测量数据已正确压缩。
  • 考虑对经常分组的列创建索引
  • 监控 DuckDB 查询性能

重复数据

问题:重新运行查询会创建重复的聚合。

解决方案

  • 重新执行前删除目标测量数据:
requests.post("/api/v1/delete", json={
"database": "telegraf",
"measurement": "cpu_hourly",
"where": f"time >= '{start_time}' AND time <= '{end_time}'"
})
  • 或者,如果支持语义化,则可以使用UPSERT语义化(未来功能)。

相关主题