压缩机制深度解析
每一个分析型数据库都有个“不那么体面”的秘密:写入速度与查询速度往往天然对立。写入想要小而频繁的落盘;查询却偏好大而有序的文件。IotEdgeDB 的 compaction(压缩)系统,正是连接这两端的桥梁。 这篇文章会带你看清它到底怎么运作。
下面我们来看看它到底是怎么工作的。
问题:小文件过多,迟早会拖垮系统
IotEdgeDB 大约每 5 秒就会把内存中的缓冲区刷新成一个 Parquet 文件(这个间隔可以通过 max_buffer_age_ms 配置)。这样做的好处是数据落盘快,可靠性高;但代价也很明显:如果某个指标持续高频写入,一小时内可能会产生成百上千个小文件。
例如,某个小时内磁盘上的文件可能长这样:
metrics/cpu/2026/04/15/14/cpu_20260415_140005_123456789.parquet (42 KB)
metrics/cpu/2026/04/15/14/cpu_20260415_140010_987654321.parquet (38 KB)
metrics/cpu/2026/04/15/14/cpu_20260415_140015_456789123.parquet (41 KB)
...(还有 100 多个文件)
每个文件本身都是合法的 Parquet 文件,也包含完整的元数据、压缩和表结构信息。但当你执行 SELECT * FROM metrics.cpu WHERE time > now() - INTERVAL '24 hours' 这类查询时,DuckDB 就不得不逐个打开这些文件、读取元数据并扫描内容。对于本来应该很快的查询来说,这样的文件系统开销实在太大了。
压缩机制就是为了解决这个问题。
两级压缩:按小时压缩,再按天压缩
IotEdgeDB 采用了两层压缩模型。这不是为了“炫技”,而是对问题形态最自然的拆解。
小时压缩
小时压缩每小时执行一次,默认在整点后的第 5 分钟触发。它会把某个小时分区里所有零散的小文件合并成一个文件。
# 压缩前(100+ 个文件,总大小约 4 MB)
metrics/cpu/2026/04/15/14/cpu_20260415_140005_*.parquet
metrics/cpu/2026/04/15/14/cpu_20260415_140010_*.parquet
...
# 压缩后(1 个文件,总大小约 4 MB,且已经按时间排序)
metrics/cpu/2026/04/15/14/cpu_20260415_140500_compacted.parquet
按天压缩
日级压缩每天执行一次,默认在凌晨 3 点触发。它会把一天内所有已经小时压缩过的文件再合并成一个更大的文件。
# 压缩前(24 个小时级文件,总大小约 100 MB)
metrics/cpu/2026/04/15/00/cpu_*_compacted.parquet
metrics/cpu/2026/04/15/01/cpu_*_compacted.parquet
...
metrics/cpu/2026/04/15/23/cpu_*_compacted.parquet
# 压缩后(1 个文件,每天 1 个)
metrics/cpu/2026/04/15/cpu_20260416_030000_daily.parquet
最终效果是:近期数据保留在少量小时级文件中,适合查询“今天”的时间范围;历史数据则被收敛到日级文件中,更适合跨周、跨月的分析查询。
一次压缩任务到底做了什么
每个压缩任务都会经历一个 5 阶段流水线:
-
下载 —— 并行下载(4 个 worker)把对象存储中的文件拉到本地临时目录。无论数据存放在本地磁盘、S3 还是 Azure Blob,这一步都适用。
-
校验 —— 做一次快速的 Parquet 特征码检查:总共只读取 8 个字节,分别检查文件头和文件尾是否都是
PAR1。这里不会用 DuckDB 的read_parquet()做校验,因为那会把整个文件加载进内存;单文件还好,成百上千个文件就很危险了。 -
合并 —— 这一步由 DuckDB 负责真正的重活:
COPY (
SELECT *
FROM read_parquet(
['file1.parquet', 'file2.parquet', ...],
union_by_name = true
)
ORDER BY time
)
TO 'output_compacted.parquet'
(
FORMAT PARQUET,
COMPRESSION ZSTD,
COMPRESSION_LEVEL 3,
ROW_GROUP_SIZE 122880
)
这里有几个关键点:
union_by_name=true用来处理 schema 演进。如果某个 measurement 上周二新增了一列,那么旧文件里没有这列也没关系,DuckDB 会自动补成NULL。ORDER BY time(或者你自定义的排序键)能保证输出文件按查询友好的顺序排列。ROW_GROUP_SIZE 122880(每组约 12 万行)在 DuckDB 的读取粒度和内存占用之间取得平衡。ZSTD压缩级别 3 能在压缩率和 CPU 开销之间取得很好的折中。
-
上传 —— 将压缩后的文件流式上传回存储。
-
删除 —— 删除原始文件。只有在第 2 步通过校验的文件,才会进入删除流程;我们绝不会删除那些没有成功压缩的文件。
为什么要放到子进程里跑
这里有一个我们花了很久才摸清楚的细节:DuckDB 默认使用 jemalloc 作为内存分配器。jemalloc 性能很好,也能减少内存碎片,但它有个特点——即使你关闭了连接,也不会把所有内存完全还给操作系统,而是保留在 arena 里复用。
如果压缩任务直接跑在长期运行的 API 进程里,每一次压缩都会留下些内存痕迹。跑个几十次之后,进程里可能就积累了好几 GB“已经不用了、但还占着”的内存。放在有内存限制的容器里,这就是典型的 OOM 事故前兆。
我们的解决方案很直接:每个压缩任务都放到子进程里执行。
IotEdgeDB(主进程) → 启动 → IotEdgeDB compact --job-stdin(子进程)
↓
新建 DuckDB 连接
执行 5 阶段流水线
退出 → 所有内存归还给操作系统
主进程把任务配置序列化成 JSON 写到标准输入,子进程负责执行压缩,把结果写到标准输出,然后退出。进程一结束,操作系统就会回收所有资源——包括 jemalloc arena、DuckDB 缓冲区、Arrow 数组等等。
这样还有一个额外好处:如果某个压缩任务 OOM 了,也不会把整个 API 服务拖死。主进程只需要记录失败日志,然后继续处理下一个分区即可。
自动去重:零开销的重复数据剔除
如果你在写入数据时使用了标签列(比如 host、region、device_id),IotEdgeDB 会把这些元数据存到 Parquet 文件的 footer 中。压缩阶段如果检测到这些标签元数据,就会自动做去重:
SELECT * EXCLUDE (__dedup_rn)
FROM (
SELECT *,
ROW_NUMBER() OVER (
PARTITION BY "host", "region", "time"
ORDER BY "time" DESC
) AS __dedup_rn
FROM read_parquet([...], union_by_name = true)
)
WHERE __dedup_rn = 1
ORDER BY time
这就是典型的“Last-write-wins”语义。如果同一个 (host, region, time) 组合在多个文件里重复出现了——可能来自重试、延迟到达的数据,或者重叠的写入窗口——最终只会保留最新的一条记录,而且这个过程是静默完成的,不需要额外配置。
通过清单文件做崩溃恢复
如果 IotEdgeDB 在压缩过程中崩溃了怎么办?答案是:我们会在上传压缩结果之前先写一份清单文件(manifest)。
_compaction_state/hourly/metrics/cpu_2026_04_15_14_{jobID}.json
这份清单会记录当前正在压缩哪些输入文件,以及输出文件会放到哪里。不同阶段出问题时,系统的行为如下:
- 上传前崩溃:只有 manifest 存在,输出文件还没生成。下一轮会忽略这份 manifest,从头重试。
- 上传后、删除前崩溃:manifest 和输出文件都在。下一轮会先校验输出文件,然后补做输入文件删除。
- 删除后:manifest 被清理掉,整个流程结束。
这样一来,压缩任务就具备了幂等性。无论你在哪个时点崩溃,都可以安全重启,不会重复写入,也不会丢数据。
配置示例
下面是一份实用的配置,展示了一些最常用的参数:
compaction:
hourly_schedule: "5 * * * *" # 每小时的 :05 执行
daily_schedule: "0 3 * * *" # 每天凌晨 3 点执行
hourly_min_files: 10 # 少于 10 个文件先不压缩
daily_min_files: 12 # 约半天的小时文件量再压缩
hourly_min_age_hours: 1 # 等待写入结束
daily_min_age_hours: 24 # 等待整天数据都到齐
max_concurrent: 2 # 并行任务数(CPU 密集型)
daily_skip_file_age_check_days: 7 # 跳过冷数据更新检查
ingest:
sort_keys:
- "cpu:host,time" # cpu 数据按 host、time 排序
- "network:interface,time" # network 数据按 interface、time 排序
default_sort_keys: "time" # 其他数据默认按 time 排序
API:手动触发与监控
你不必完全依赖调度器,也可以手动触发压缩:
curl -X POST http://localhost:8000/api/v1/compaction/trigger \
-H "Authorization: Bearer $TOKEN" \
-d 'tier=hourly&database=metrics'
查看当前状态也很方便:
# 当前状态
curl http://localhost:8000/api/v1/compaction/status
# 历史统计
curl http://localhost:8000/api/v1/compaction/stats
# 当前有哪些分区满足压缩条件
curl http://localhost:8000/api/v1/compaction/candidates
其中 /candidates 特别实用:它会扫描存储,列出当前哪些分区已经积累到足够的文件数,可以立即压缩。这样你就能决定是手动触发,还是继续等调度器执行。
为什么这件事很重要
如果没有压缩,一个每秒写入 10,000 条记录的 measurement,大约每小时就会生成 720 个文件。跑上一周,单个 measurement 的文件数就会超过 120,000 个。查询会变得越来越慢,S3 的 LIST 调用也会越来越贵,云账单更是会迅速变得难看。
而有了两级压缩,同样一周的数据最终只会落成 7 个日级文件。无论你已经持续写入了多久,查询性能都能保持稳定。
子进程模型确保压缩不会影响查询延迟;清单机制确保它不会丢数据;自动去重则让你不用再担心重试写入带来的重复记录。
这也许不是数据库里最“酷”的部分,但它却是系统在规模化场景下依然稳定运行的关键。
