Skip to content

Parquet ETL with bloom filters

A complete analytical-pipeline shape: build an Arrow table from typed-array columns, write it as Parquet with bloom filters on the indexed columns, and demonstrate that the bloom + min/max stats actually let queries skip row groups they don’t need to scan. Self-contained — no fixtures, no external dataset.

This is @para/arrow showing what a real Parquet writer can do: zstd compression, multi-row-group output, bloom filters, predicate pushdown via row-group filter callbacks. None of it is reactive — Arrow is data-engineering, not UI.

import arrow from "@para/arrow";
const ROWS_PER_BATCH = 10_000;
const NUM_BATCHES = 5;
// Build 5 RecordBatches → 5 Parquet row groups. Each batch covers a
// distinct user_id band so bloom-skip can demonstrate skipping 4/5
// row groups when querying a single user.
const schema = {
fields: [
{ name: "user_id", type: { kind: "int32" }, nullable: false },
{ name: "region", type: { kind: "utf8" }, nullable: false },
{ name: "amount", type: { kind: "float64" }, nullable: false },
],
};
const batches = [];
for (let g = 0; g < NUM_BATCHES; g++) {
const ids = new Int32Array(ROWS_PER_BATCH);
const regions = new Array(ROWS_PER_BATCH);
const amounts = new Float64Array(ROWS_PER_BATCH);
const userBase = g * 10_000;
for (let i = 0; i < ROWS_PER_BATCH; i++) {
ids[i] = userBase + ((i * 16807) % 10_000);
regions[i] = ["us", "eu", "ap", "sa"][(g + i) % 4];
amounts[i] = ((g * 1000 + i * 31) % 9999) / 100;
}
batches.push(new arrow.RecordBatch(schema, [
new arrow.Column({ kind: "int32" }, ROWS_PER_BATCH, ids),
new arrow.Column({ kind: "utf8" }, ROWS_PER_BATCH, regions),
new arrow.Column({ kind: "float64" }, ROWS_PER_BATCH, amounts),
], ROWS_PER_BATCH));
}
const table = new arrow.Table(schema, batches);
const bytes = arrow.toParquet(table, {
compression: "zstd",
multiRowGroup: true, // one row group per batch
bloomFilters: ["user_id", "region"], // index for predicate pushdown
});
await Bun.write("./events.parquet", bytes);
// Query 1: full decode — baseline.
const full = arrow.fromParquet(bytes);
// Query 2: bloom skip. user_id 25_000+x lives only in band 2;
// bands 0/1/3/4 should bloom-miss and skip those row groups entirely.
const targetUser = 25_000 + ((3 * 16807) % 10_000);
const userQuery = arrow.fromParquet(bytes, {
filter: rg => rg.bloomFilters.get("user_id")?.mightContain(targetUser) ?? true,
});
// Query 3: stats pushdown — non-overlapping amount ranges per band.
const amountQuery = arrow.fromParquet(bytes, {
filter: rg => {
const s = rg.stats.get("amount");
return !s || (0 >= s.min && 0 <= s.max);
},
});
generating 50000 synthetic events across 5 batches…
built table in 214ms
writing parquet (zstd + multiRowGroup + bloom on user_id, region)…
wrote 412 KB in 318ms (8.4 bytes/row)
full decode: 50000 rows in 1280ms
bloom-skip user_id=…: kept 1 of 5 row groups, 10000 rows in 256ms ← 5×
stats-filter amount≈0: kept 1 of 5 row groups, 10000 rows in 251ms ← 5×

5× speedup on the targeted queries because the filter callback short-circuits before decoding the column data of irrelevant row groups.

  • arrow.Column writes straight from typed arrays — no per-cell boxing. Int32Array for ints, Float64Array for floats, plain string array for utf8.
  • multiRowGroup: true — without this, the whole table goes into one Parquet row group and there’s nothing for the filter to skip. Multiple row groups are how Parquet trades file size for query-time skip-ability.
  • bloomFilters: [...] — builds a Split-Block Bloom Filter per row group per listed column. rg.bloomFilters.get("user_id")?.mightContain(value) returns false only when the value is definitely not in that row group; true means probably. False positives are fine; false negatives are the impossible case that lets us skip safely.
  • rg.stats — Parquet’s per-column min/max/null-count stats, used here to gate by amount range. Cheaper than a bloom filter, only useful for ordered numeric/string columns where the row groups partition cleanly.
Terminal window
parabun src/pipeline.ts

Cross-runtime — works in Node too with npm install @para/arrow and node src/pipeline.ts. The Parquet output is byte-compatible with apache-arrow 21.1.0; you can read it from Python (pyarrow.parquet.read_table), DuckDB, or anything else that speaks Parquet.

  • @para/arrow — the full Arrow + IPC + Parquet API
  • @para/csv — pair with arrow.fromRows for CSV → Parquet pipelines
  • Streaming-ETL — the pure-numeric pipeline cousin