连续查询
连续查询能够自动对数据进行降采样和聚合,生成物化视图,从而在保持可查询历史数据的同时,降低存储需求。
概述
IEDB 中的连续查询可以帮助您:
- 数据降采样:将高频数据聚合为低频摘要
- 减少存储空间:存储聚合数据而非原始数据。
- 保留历史记录:保留长期趋势,但无需完全数颗粒度数据
- 提升查询性能:查询预聚合数据以加快结果获取速度
- 创建物化视图:自动维护聚合数据集
工作原理
连续查询将源测量数据聚合到目标测量数据中:
- 定义查询:使用 SQL 指定聚合逻辑
- 设置计划:配置分组的时间间隔(例如,每小时、每天)
- 手动执行:通过 API 和开始/结束时间触发执行
- 存储结果:将汇总数据写入新的测量表值
- 应用保留策略:可选择为聚合数据设置自定义保留策略
流程图
源测量表 (cpu)
↓
连续查询 (AVG, MAX, MIN, etc.)
↓
目标测量表 (cpu_hourly)
↓
可选: 保留策略
API 接口
创建连续查询
定义一个新的连续查询:
POST /api/v1/continuous_queries
body:
{
"name": "cpu_hourly_avg",
"database": "telegraf",
"source_measurement": "cpu",
"destination_measurement": "cpu_hourly",
"query": "SELECT time_bucket('1 hour', time) AS time, host, AVG(usage_idle) AS avg_usage_idle, AVG(usage_user) AS avg_usage_user, COUNT(*) AS sample_count FROM telegraf.cpu GROUP BY time_bucket('1 hour', time), host",
"interval": "1h",
"retention_policy": "90d",
"is_active": true
}
参数:
name(字符串,必填):唯一查询标识符database(字符串,必填):目标数据库名称source_measurement(字符串,必填):要聚合的源测量表destination_measurement(字符串,必填):目标存储位置query(字符串,必填):SQL 聚合查询interval(字符串,必填):时间段间隔(1m``5m``1h``1d等等)retention_policy(字符串,可选):聚合数据的保留期限(例如90d,365d)is_active(布尔值,必填):启用/禁用查询
连续查询
检索所有连续查询列表:
GET /api/v1/continuous_queries
响应:
[
{
"id": "550e8400-e29b-41d4-a716-446655440000",
"name": "cpu_hourly_avg",
"database": "telegraf",
"source_measurement": "cpu",
"destination_measurement": "cpu_hourly",
"interval": "1h",
"retention_policy": "90d",
"is_active": true,
"created_at": "2024-01-15T10:30:00Z",
"last_executed_at": "2024-01-20T02:00:00Z"
}
]
获取单个查询
检索特定的连续查询:
GET /api/v1/continuous_queries/{query_id}
更新连续查询
更新现有连续查询:
PUT /api/v1/continuous_queries/{query_id}
body:与创建查询相同
删除连续查询
删除连续查询:
DELETE /api/v1/continuous_queries/{query_id}
警告 删除连续查询不会删除目标测量表或其数据。聚合数据仍然可查询。
执行连续查询
手动触发连续查询:
POST /api/v1/continuous_queries/{query_id}/execute
body:
{
"start_time": "2024-01-01T00:00:00Z",
"end_time": "2024-01-31T23:59:59Z",
"dry_run": false
}
参数:
start_time(字符串,必填):起始时间戳(ISO 8601 格式)end_time(字符串,必填):结束时间戳(ISO 8601 格式)dry_run(布尔值,可选):不写入数据进行测试(默认值false)
响应:
{
"query_id": "550e8400-e29b-41d4-a716-446655440000",
"rows_processed": 1000000,
"rows_written": 720,
"execution_time_ms": 2500,
"time_range": {
"start": "2024-01-01T00:00:00Z",
"end": "2024-01-31T23:59:59Z"
},
"dry_run": false
}
查看执行历史记录
查看连续查询的历史执行记录:
GET /api/v1/continuous_queries/{query_id}/executions?limit=50
响应:
[
{
"execution_id": "abc123",
"executed_at": "2024-01-20T02:00:00Z",
"start_time": "2024-01-19T00:00:00Z",
"end_time": "2024-01-20T00:00:00Z",
"rows_processed": 86400,
"rows_written": 24,
"execution_time_ms": 1200,
"status": "success"
}
]
查询语法
连续查询使用 SQL 并进行时间优化。
推荐方法
用于epoch_us()时间戳转换和date_trunc()时间段划分:
SELECT
date_trunc('hour', epoch_us(time)) AS time,
host,
AVG(usage_idle) AS avg_usage_idle,
MAX(usage_user) AS max_usage_user,
MIN(usage_system) AS min_usage_system,
COUNT(*) AS sample_count
FROM telegraf.cpu
GROUP BY date_trunc('hour', epoch_us(time)), host
支持的聚合
AVG()平均值SUM()- 数值总和MIN()- 最小值MAX()- 最大值COUNT()行数STDDEV()标准差PERCENTILE_CONT()- 百分位数计算
时间维度聚合
使用**date_trunc()**:
-- 小时
date_trunc('hour', epoch_us(time))
-- 天
date_trunc('day', epoch_us(time))
-- 5分钟
date_trunc('hour', epoch_us(time)) + INTERVAL '5 minutes' * floor(extract(minute from epoch_us(time)) / 5)
样本计数
务必COUNT(*)记录每个聚合体所代表的原始样本数量:
SELECT
date_trunc('hour', epoch_us(time)) AS time,
host,
AVG(usage_idle) AS avg_usage_idle,
COUNT(*) AS sample_count -- Important for data quality
FROM telegraf.cpu
GROUP BY date_trunc('hour', epoch_us(time)), host
使用示例
示例 1:每小时 CPU 指标
将每秒 CPU 指标汇总成每小时平均值:
import requests
# 创建连续查询
response = requests.post(
"http://localhost:8000/api/v1/continuous_queries",
headers={"Authorization": "Bearer YOUR_TOKEN"},
json={
"name": "cpu_hourly",
"database": "telegraf",
"source_measurement": "cpu",
"destination_measurement": "cpu_hourly",
"query": """
SELECT
date_trunc('hour', epoch_us(time)) AS time,
host,
AVG(usage_idle) AS avg_usage_idle,
AVG(usage_user) AS avg_usage_user,
AVG(usage_system) AS avg_usage_system,
MAX(usage_user) AS max_usage_user,
COUNT(*) AS sample_count
FROM telegraf.cpu
GROUP BY date_trunc('hour', epoch_us(time)), host
""",
"interval": "1h",
"retention_policy": "365d",
"is_active": True
}
)
query_id = response.json()["id"]
# 统计过去三十天
from datetime import datetime, timedelta
end_time = datetime.utcnow()
start_time = end_time - timedelta(days=30)
result = requests.post(
f"http://localhost:8000/api/v1/continuous_queries/{query_id}/execute",
headers={"Authorization": "Bearer YOUR_TOKEN"},
json={
"start_time": start_time.isoformat() + "Z",
"end_time": end_time.isoformat() + "Z"
}
)
print(f"Processed {result.json()['rows_processed']} rows")
print(f"Generated {result.json()['rows_written']} aggregated rows")
示例 2:每日请求汇总
将 API 请求日志汇总成每日摘要:
response = requests.post(
"http://localhost:8000/api/v1/continuous_queries",
headers={"Authorization": "Bearer YOUR_TOKEN"},
json={
"name": "requests_daily",
"database": "logs",
"source_measurement": "api_requests",
"destination_measurement": "api_requests_daily",
"query": """
SELECT
date_trunc('day', epoch_us(time)) AS time,
endpoint,
status_code,
COUNT(*) AS total_requests,
AVG(response_time_ms) AS avg_response_time,
MAX(response_time_ms) AS max_response_time,
PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY response_time_ms) AS p95_response_time
FROM api_requests
GROUP BY date_trunc('day', epoch_us(time)), endpoint, status_code
""",
"interval": "1d",
"retention_policy": "730d", # 2 years
"is_active": True
}
)
示例 3:5 分钟传感器读数
将物联网传感器数据降采样至 5 分钟间隔:
response = requests.post(
"http://localhost:8000/api/v1/continuous_queries",
headers={"Authorization": "Bearer YOUR_TOKEN"},
json={
"name": "sensors_5min",
"database": "iot",
"source_measurement": "temperature",
"destination_measurement": "temperature_5min",
"query": """
SELECT
date_trunc('hour', epoch_us(time)) +
INTERVAL '5 minutes' * floor(extract(minute from epoch_us(time)) / 5) AS time,
sensor_id,
location,
AVG(temperature) AS avg_temperature,
MIN(temperature) AS min_temperature,
MAX(temperature) AS max_temperature,
COUNT(*) AS sample_count
FROM temperature
GROUP BY
date_trunc('hour', epoch_us(time)) +
INTERVAL '5 minutes' * floor(extract(minute from epoch_us(time)) / 5),
sensor_id,
location
""",
"interval": "5m",
"retention_policy": "90d",
"is_active": True
}
)
示例 4:预运行测试
在执行之前测试连续查询:
# 创建查询
response = requests.post(
"http://localhost:8000/api/v1/continuous_queries",
headers={"Authorization": "Bearer YOUR_TOKEN"},
json={...}
)
query_id = response.json()["id"]
# 试运行测试
dry_run = requests.post(
f"http://localhost:8000/api/v1/continuous_queries/{query_id}/execute",
headers={"Authorization": "Bearer YOUR_TOKEN"},
json={
"start_time": "2024-01-01T00:00:00Z",
"end_time": "2024-01-02T00:00:00Z",
"dry_run": True
}
)
print(f"Would process {dry_run.json()['rows_processed']} rows")
print(f"Would generate {dry_run.json()['rows_written']} aggregated rows")
# 真实执行
if dry_run.json()['rows_written'] > 0:
result = requests.post(
f"http://localhost:8000/api/v1/continuous_queries/{query_id}/execute",
headers={"Authorization": "Bearer YOUR_TOKEN"},
json={
"start_time": "2024-01-01T00:00:00Z",
"end_time": "2024-01-02T00:00:00Z",
"dry_run": False
}
)
存储优势
连续查询可显著降低存储需求:
降采样之前
原始 CPU 指标(1 秒间隔):
- 1 年 = 每台主机 31,536,000 行
- 10 台主机 = 315,360,000 行
- 存储空间:约 20GB
降采样至小时后
小时汇总:
- 1 年 = 每个主机 8,760 行
- 10 台主机 = 87,600 行
- 存储空间:约 50MB
缩减:缩小约 400 倍,同时保持每小时趋势可见性。
多层战略
结合不同粒度以实现最佳存储效果:
# 层级1:原始数据保留7天
# 层级2:小时聚合数据保留90天
requests.post("/api/v1/continuous_queries", json={
"name": "cpu_hourly",
"interval": "1h",
"retention_policy": "90d"
})
# 层级3:天聚合数据保留2年
requests.post("/api/v1/continuous_queries", json={
"name": "cpu_daily",
"source_measurement": "cpu_hourly", # 对小时聚合数据进行二次聚合
"destination_measurement": "cpu_daily",
"interval": "1d",
"retention_policy": "730d"
})
# 使用保留策略在7天后删除原始数据
requests.post("/api/v1/retention", json={
"database": "telegraf",
"measurement": "cpu",
"retention_days": 7
})
最佳实践
1. 建议保守
开始时间隔时间要长一些,然后根据实际需要进行调整:
# 从小时粒度开始
{"interval": "1h"}
# 如果粒度太粗,可调整为15分钟
{"interval": "15m"}
2. 初始阶段保留原始数据
在测试聚合数据时保留原始数据:
# 创建连续查询
create_query(...)
# 彻底测试聚合
execute_dry_run(...)
execute_for_real(...)
# 仅在验证后,对原始数据应用保留策略
requests.post("/api/v1/retention", json={
"measurement": "cpu",
"retention_days": 30 # 原始数据保留30天
})
3. 使用预运行测试
在正式执行查询之前,务必先进行预运行测试:
# 先在小时间范围内测试
dry_run(start="2024-01-01", end="2024-01-02")
# 逐步扩大范围
dry_run(start="2024-01-01", end="2024-01-07")
# 最后,全量执行
execute(start="2024-01-01", end="2024-12-31")
4. 样本计数
追踪每个汇总样本中的原始样本数量:
SELECT
date_trunc('hour', epoch_us(time)) AS time,
COUNT(*) AS sample_count, -- Essential for data quality
AVG(value) AS avg_value
FROM measurement
GROUP BY date_trunc('hour', epoch_us(time))
这有助于识别:
- 数据缺失(样本量过低)
- 数据质量问题
- 意想不到的情况
5. 监控执行性能
监控连续查询执行时间:
result = execute_query(...)
print(f"Execution time: {result['execution_time_ms']}ms")
print(f"Throughput: {result['rows_processed'] / (result['execution_time_ms'] / 1000):.0f} rows/sec")
# Alert if execution takes too long
if result['execution_time_ms'] > 60000: # 1 minute
print("Warning: Slow execution!")
6. 使用适当的间隔
将区间与数据特征相匹配:
高频数据(物联网传感器,每 1 秒间隔):
- 近期分析的 5 分钟汇总数据
- 中期每小时汇总数据
- 长期趋势的每日汇总数据
中频数据(每分钟间隔的应用指标):
- 近期分析的小时汇总数据
- 长期每日汇总数据
低频数据(每小时一次的业务指标):
- 每日汇总
- 用于多年分析的月度汇总数据
故障排除
未写入任何行
问题:执行返回rows_written: 0。
解决方案:
- 验证源测量数据是否包含在指定的时间范围内。
- 请检查查询语法是否正确
- 确保
GROUP BY子句与聚合列匹配 - 使用试运行来检查查询结果
查询语法错误
问题:执行失败,出现 SQL 错误。
解决方案:
/query使用端点直接测试查询- 验证源测量中是否存在列名
- 检查 DuckDB 特有的语法要求
- 用于
epoch_us()时间戳转换
执行缓慢
问题:连续查询耗时过长。
解决方案:
- 缩短每次执行的时间范围
- 确保源测量数据已正确压缩。
- 考虑对经常分组的列创建索引
- 监控 DuckDB 查询性能
重复数据
问题:重新运行查询会创建重复的聚合。
解决方案:
- 重新执行前删除目标测量数据:
requests.post("/api/v1/delete", json={
"database": "telegraf",
"measurement": "cpu_hourly",
"where": f"time >= '{start_time}' AND time <= '{end_time}'"
})
- 或者,如果支持语义化,则可以使用
UPSERT语义化(未来功能)。