跳到主要内容

数据删除

IEDB 支持使用基于重写的方法删除数据,从而实现精确删除,且写入和查询操作的开销为零。

需要配置 为了安全起见,必须在配置中明确启用删除操作。

概述

IEDB 的删除操作提供:

  • 精确控制:使用 WHERE 子句删除特定行
  • 零运行时开销:对写入或查询性能无影响
  • 物理删除:通过重写 Parquet 文件永久删除数据。
  • 安全机制:多重安全措施防止意外删除
  • 预运行模式:执行前测试操作

工作原理

IEDB 采用基于重写的删除方法:

1. 查找受影响的文件

使用 DuckDB 扫描测量目录,以识别包含与 WHERE 子句匹配的行的 Parquet 文件。

2. 重写文件

对于每个受影响的文件:

  1. 加载文件到 Arrow 表中
  2. 筛选出匹配的行:SELECT * FROM table WHERE NOT (delete_clause)
  3. 将筛选后的数据写入临时文件
  4. 使用原子方式替换原始文件os.replace()

3. 清理

  • 过滤后变为空的文件将被完全删除。
  • 包含剩余数据的文件将被重写后的版本替换。
  • 所有操作均采用原子文件替换以确保数据完整性

原子安全

删除过程中系统崩溃会导致旧文件或新文件存在,绝不会造成文件损坏或部分写入。

配置

删除操作必须明确启用和配置。

配置文件

编辑conf

[delete]
enabled = true
confirmation_threshold = 10000
max_rows_per_delete = 1000000

配置参数

  • enabled(布尔值):启用/禁用删除功能(默认值false
  • confirmation_threshold(整数):需要明确确认的行数(默认值10000
  • max_rows_per_delete(整数):每次操作允许的最大行数(默认值1000000

API 接口

删除数据

执行删除操作:

POST /api/v1/delete

请求正文

{
"database": "telegraf",
"measurement": "cpu",
"where": "host = 'server01' AND time < '2024-01-01'",
"dry_run": false,
"confirm": false
}

参数

  • database(字符串,必填):目标数据库名称
  • measurement(字符串,必填):目标测量表名称
  • where(字符串,必填):用于删除的 SQL WHERE 子句
  • dry_run(布尔值,可选):测试而不删除(默认值false:)
  • confirm(布尔值,可选):确认大型操作(默认值false:)

回复

{
"deleted_count": 15000,
"affected_files": 3,
"rewritten_files": 2,
"deleted_files": 1,
"execution_time_ms": 1250,
"files": [
{
"path": "/data/telegraf/cpu/2023-12-15.parquet",
"action": "rewritten",
"rows_before": 10000,
"rows_after": 5000
},
{
"path": "/data/telegraf/cpu/2023-12-20.parquet",
"action": "deleted",
"rows_before": 5000,
"rows_after": 0
}
]
}

获取配置

获取当前删除配置:

GET /api/v1/delete/config

回复

{
"enabled": true,
"confirmation_threshold": 10000,
"max_rows_per_delete": 1000000
}

安全机制

1. 需要 WHERE 条件

删除操作必须包含 WHERE 子句,以防止意外删除整个表。

故意完全删除

{
"where": "1=1" // Explicitly delete all rows
}

2. 确认阈值

超出设定阈值的操作需要明确确认:

{
"where": "time < '2024-01-01'",
"confirm": true // Required if deleted_count > threshold
}

未经确认

{
"error": "Operation would delete 15000 rows, exceeding threshold of 10000. Set confirm=true to proceed."
}

3. 最大行数限制

硬性上限限制了可能耗尽资源的超大规模操作:

{
"error": "Operation would delete 2000000 rows, exceeding maximum of 1000000"
}

4. 原子文件替换

文件以原子方式替换os.replace(),确保:

  • 不允许部分写入。
  • 无数据损坏
  • 崩溃恢复(旧文件或新文件都存在)

使用示例

示例 1:删除旧数据

import requests

# 删除早于指定日期的数据
response = requests.post(
"http://localhost:8000/api/v1/delete",
headers={"Authorization": "Bearer YOUR_TOKEN"},
json={
"database": "telegraf",
"measurement": "cpu",
"where": "time < '2024-01-01'"
}
)

print(f"Deleted {response.json()['deleted_count']} rows")
print(f"Execution time: {response.json()['execution_time_ms']}ms")

示例 2:删除特定主机数据

# 删除特定主机的数据
response = requests.post(
"http://localhost:8000/api/v1/delete",
headers={"Authorization": "Bearer YOUR_TOKEN"},
json={
"database": "telegraf",
"measurement": "cpu",
"where": "host = 'server01' OR host = 'server02'"
}
)

示例 3:预运行模式删除

# 删除前始终使用试运行测试
dry_run = requests.post(
"http://localhost:8000/api/v1/delete",
headers={"Authorization": "Bearer YOUR_TOKEN"},
json={
"database": "telegraf",
"measurement": "cpu",
"where": "host = 'server01'",
"dry_run": True
}
)

print(f"将删除 {dry_run.json()['deleted_count']} 行")
print(f"受影响文件数:{dry_run.json()['affected_files']}")

# 查看将要受影响的文件
for file in dry_run.json()['files']:
print(f" {file['path']}: {file['rows_before']} -> {file['rows_after']} 行")

# 确认无误后,实际执行
if input("是否继续?(yes/no): ") == "yes":
result = requests.post(
"http://localhost:8000/api/v1/delete",
headers={"Authorization": "Bearer YOUR_TOKEN"},
json={
"database": "telegraf",
"measurement": "cpu",
"where": "host = 'server01'",
"dry_run": False
}
)
print(f"已删除 {result.json()['deleted_count']} 行")

示例 4:确认删除

# 大删除操作需要确认
response = requests.post(
"http://localhost:8000/api/v1/delete",
headers={"Authorization": "Bearer YOUR_TOKEN"},
json={
"database": "telegraf",
"measurement": "cpu",
"where": "time < '2023-01-01'",
"confirm": True # 显式确认大操作
}
)

示例 5:复杂的 WHERE 子句

# 基于多个条件删除数据
response = requests.post(
"http://localhost:8000/api/v1/delete",
headers={"Authorization": "Bearer YOUR_TOKEN"},
json={
"database": "telegraf",
"measurement": "cpu",
"where": """
host IN ('server01', 'server02', 'server03')
AND time BETWEEN '2023-01-01' AND '2023-06-30'
AND usage_idle < 10
"""
}
)

性能特征

删除操作计算量很大,但设计用于不频繁使用:

执行时间

小文件(10MB):

  • 读取 + 过滤 + 写入:每个文件约 50-100 毫秒

中等大小的文件(100MB):

  • 读取 + 过滤 + 写入:每个文件约 500 毫秒至 1 秒

大文件(1GB):

  • 读取 + 过滤 + 写入:每个文件约 2-5 秒

性能因素

  • 文件大小:较大的文件需要更长时间才能重写。
  • 选择性:删除的行数越少,速度越快(数据移动量越少)
  • 存储 I/O:磁盘速度影响读/写性能
  • 并发负载:其他操作可能会减慢删除速度。

最佳实践

1. 默认禁用

仅在需要时启用删除操作:

[delete]
enabled = false

2. 务必进行预运行模式

执行操作前先进行测试以验证其作用范围:

# 步骤1:试运行
result = requests.post(..., json={"dry_run": True})
print(f"将删除 {result['deleted_count']} 行")

# 步骤2:查看受影响的文件
for file in result['files']:
print(f"{file['path']}: {file['action']}")

# 步骤3:确认后执行
result = requests.post(..., json={"dry_run": False, "confirm": True})

3. 考虑保留策略

对于基于时间的删除,请改用保留策略

# 替代手动删除:
# requests.post("/api/v1/delete", json={"where": "time < '2024-01-01'"})

# 使用保留策略:
requests.post("/api/v1/retention", json={
"retention_days": 90,
"buffer_days": 7
})

4. 监控执行时间

跟踪删除性能以进行容量规划:

import time

start = time.time()
result = requests.post("/api/v1/delete", json={...})
elapsed = time.time() - start

print(f"Deleted {result['deleted_count']} rows in {elapsed:.2f}s")

5. 批量删除

按时间范围将大批量删除操作拆分成较小的批次:

from datetime import datetime, timedelta

# 替代一次性大量删除:
# WHERE time < '2023-01-01' # 时间早于 '2023-01-01'

# 按月分批:
start = datetime(2022, 1, 1)
while start < datetime(2023, 1, 1):
end = start + timedelta(days=30)

requests.post("/api/v1/delete", json={
"where": f"time >= '{start.isoformat()}' AND time < '{end.isoformat()}'"
})

start = end

6. 了解存储的影响

删除操作会重写文件,这可能会暂时增加存储空间使用量:

# 删除前:100MB 原始文件
# 删除过程中:100MB 原始文件 + 60MB 临时文件 = 160MB
# 删除后:60MB 重写后的文件

操作期间请确保有足够的磁盘空间用于存放临时文件。

局限性

不适用于频繁操作

删除操作会重写整个 Parquet 文件,因此开销很大。它们仅适用于不频繁的手动操作

使用案例

  • 删除测试数据
  • 删除特定主机/传感器
  • 一次性清理作业

不适用于

  • 自动定期删除(使用保留策略)
  • 高频数据清理
  • 实时数据删除

明确指出需要的地方

删除整表需要明确指定WHERE 1=1

# 以下操作会失败:
{"where": ""} # 错误:需要 WHERE 子句

# 显式全量删除:
{"where": "1=1", "confirm": True}

最大行数限制

大批量删除操作需进行max_rows_per_delete配置:

# 若超出限制则会失败:
{"where": "time < '2020-01-01'"} # 可能超过最大行数限制

# 解决方案:按时间范围分批处理
{"where": "time >= '2023-01-01' AND time < '2023-02-01'"}

文件级锁定

删除过程中,受影响的文件将被锁定。并发写入操作可能会延迟。

故障排除

删除功能未启用

问题DELETE_ENABLED=false或未配置。

解决方案

[delete]
enabled = true

需要确认

问题:操作超出确认阈值。

解决方案:添加confirm: true

{"confirm": true}

超出最大行数

问题:删除操作将影响比预期更多的行max_rows_per_delete

解决方案

  1. 按时间范围批量执行操作
  2. max_rows_per_delete(谨慎地)增加
  3. 使用保留策略进行大规模清理

未删除任何行

问题deleted_count: 0但删除操作符合预期。

解决方案

  • 验证 WHERE 子句语法是否与数据匹配
  • 检查指定数据库/测量表中是否存在数据
  • 使用预运行模式检查受影响的文件

执行缓慢

问题:删除操作耗时比预期长。

解决方案

  • 检查文件大小(大文件处理时间较长)
  • 监控磁盘 I/O 性能
  • 低流量时段的批量操作
  • 考虑使用基于时间的保留策略进行清理

相关主题