tranfi (Python)
Streaming ETL in Python, powered by a native C11 core. Process CSV, JSONL, and text data with composable pipelines that run in constant memory, no matter how large the input.
import tranfi as tf
result = tf.pipeline([
tf.codec.csv(),
tf.ops.filter(tf.expr("col('age') > 25")),
tf.ops.sort(['-age']),
tf.ops.derive({'label': tf.expr("if(col('age')>30, 'senior', 'junior')")}),
tf.ops.select(['name', 'age', 'label']),
tf.codec.csv_encode(),
]).run(input=b'name,age\nAlice,30\nBob,25\nCharlie,35\nDiana,28\n')
print(result.output_text)
# name,age,label
# Charlie,35,senior
# Alice,30,junior
# Diana,28,junior
Or use the pipe DSL for one-liners:
result = tf.pipeline('csv | filter "col(age) > 25" | sort -age | csv').run(input_file='data.csv')
Install
pip install tranfi
Or from source:
cd build && cmake .. && make
pip install -e py/
CLI
Installing the package also installs the tranfi command:
# Filter and sort
tranfi 'csv | filter "age > 25" | sort -age | csv' < data.csv
# Built-in recipe
tranfi profile < data.csv
# File I/O
tranfi -i input.csv -o output.csv 'csv | select name,age | csv'
# List recipes
tranfi -R
Run tranfi -h for all options.
Quick start
Two APIs
Builder API -- composable, type-safe, IDE-friendly:
p = tf.pipeline([
tf.codec.csv(),
tf.ops.filter(tf.expr("col('score') >= 80")),
tf.ops.derive({'grade': tf.expr("if(col('score')>=90, 'A', 'B')")}),
tf.ops.sort(['-score']),
tf.ops.head(10),
tf.codec.csv_encode(),
])
result = p.run(input_file='students.csv')
DSL strings -- compact, suitable for CLI-like use:
p = tf.pipeline('csv | filter "col(score) >= 80" | sort -score | head 10 | csv')
result = p.run(input_file='students.csv')
Both produce identical pipelines under the hood.
Running pipelines
# From bytes
result = p.run(input=b'name,age\nAlice,30\n')
# From file (streamed in 64 KB chunks)
result = p.run(input_file='data.csv')
# Access results
result.output # bytes
result.output_text # str (UTF-8 decoded)
result.errors # bytes (error channel)
result.stats # bytes (pipeline stats)
result.stats_text # str
result.samples # bytes (sample channel)
Codecs
Codecs convert between raw bytes and columnar batches. Every pipeline starts with a decoder and ends with an encoder.
| Method | Description |
|---|---|
codec.csv(delimiter, header, batch_size, repair) |
CSV decoder. repair=True pads short / truncates long rows |
codec.csv_encode(delimiter) |
CSV encoder |
codec.jsonl(batch_size) |
JSON Lines decoder |
codec.jsonl_encode() |
JSON Lines encoder |
codec.text(batch_size) |
Line-oriented text decoder (single _line column) |
codec.text_encode() |
Text encoder |
codec.table_encode(max_width, max_rows) |
Pretty-print Markdown table |
Cross-codec pipelines work naturally:
# CSV in, JSONL out
tf.pipeline([tf.codec.csv(), tf.ops.head(5), tf.codec.jsonl_encode()])
# JSONL in, CSV out
tf.pipeline([tf.codec.jsonl(), tf.ops.sort(['name']), tf.codec.csv_encode()])
Operators
Row filtering
| Method | Description |
|---|---|
ops.filter(expr) |
Keep rows matching expression |
ops.head(n) |
First N rows |
ops.tail(n) |
Last N rows |
ops.skip(n) |
Skip first N rows |
ops.top(n, column, desc=True) |
Top N by column value |
ops.sample(n) |
Reservoir sampling (uniform random) |
ops.grep(pattern, invert, column, regex) |
Substring/regex filter |
ops.validate(expr) |
Add _valid boolean column, keep all rows |
Column operations
| Method | Description |
|---|---|
ops.select(columns) |
Keep and reorder columns |
ops.rename(**mapping) |
Rename columns: rename(name='full_name') |
ops.derive(columns) |
Computed columns: derive({'total': expr("col('a')*col('b')")}) |
ops.cast(**mapping) |
Type conversion: cast(age='int', score='float') |
ops.trim(columns) |
Strip whitespace |
ops.fill_null(**mapping) |
Replace nulls: fill_null(age='0') |
ops.fill_down(columns) |
Forward-fill nulls |
ops.clip(column, min, max) |
Clamp numeric values |
ops.replace(column, pattern, replacement, regex) |
String find/replace |
ops.hash(columns) |
Add _hash column (DJB2) |
ops.bin(column, boundaries) |
Discretize into bins |
Sorting and deduplication
| Method | Description |
|---|---|
ops.sort(columns) |
Sort rows. Prefix - for descending: sort(['-age', 'name']) |
ops.unique(columns) |
Deduplicate on specified columns |
Aggregation
| Method | Description |
|---|---|
ops.stats(stats_list) |
Column statistics. Stats: count, min, max, sum, avg, stddev, variance, median, p25, p75, p90, p99, distinct, hist, sample |
ops.frequency(columns) |
Value counts (descending) |
ops.group_agg(group_by, aggs) |
Group by + aggregate |
# Group aggregation
tf.ops.group_agg(['city'], [
{'column': 'price', 'func': 'sum', 'result': 'total'},
{'column': 'price', 'func': 'avg', 'result': 'avg_price'},
])
Sequential / window
| Method | Description |
|---|---|
ops.step(column, func, result) |
Running aggregation: running-sum, running-avg, running-min, running-max, lag |
ops.window(column, size, func, result) |
Sliding window: avg, sum, min, max |
ops.lead(column, offset, result) |
Lookahead N rows |
Reshape
| Method | Description |
|---|---|
ops.explode(column, delimiter) |
Split delimited string into rows |
ops.split(column, names, delimiter) |
Split column into multiple columns |
ops.unpivot(columns) |
Wide to long (melt) |
ops.stack(file, tag, tag_value) |
Vertically concatenate another CSV file |
Date/time
| Method | Description |
|---|---|
ops.datetime(column, extract) |
Extract parts: year, month, day, hour, minute, second, weekday |
ops.date_trunc(column, trunc, result) |
Truncate to: year, month, day, hour, minute, second |
Other
| Method | Description |
|---|---|
ops.flatten() |
Flatten nested columns |
ops.reorder(columns) |
Alias for select |
ops.dedup(columns) |
Alias for unique |
Expressions
Used in filter, derive, and validate. Reference columns with col('name').
tf.ops.filter(tf.expr("col('age') > 25 and contains(col('name'), 'A')"))
tf.ops.derive({
'full': tf.expr("concat(col('first'), ' ', col('last'))"),
'grade': tf.expr("if(col('score')>=90, 'A', if(col('score')>=80, 'B', 'C'))"),
})
Available functions
| Category | Functions |
|---|---|
| Arithmetic | + - * / |
| Comparison | > >= < <= == != |
| Logic | and or not |
| String | upper(s) lower(s) initcap(s) len(s) trim(s) left(s,n) right(s,n) concat(a,b,...) replace(s,old,new) slice(s,start,len) pad_left(s,w) pad_right(s,w) |
| Predicates | starts_with(s,prefix) ends_with(s,suffix) contains(s,sub) |
| Conditional | if(cond,then,else) coalesce(a,b,...) nullif(a,b) |
| Math | abs(x) round(x) floor(x) ceil(x) sign(x) pow(x,y) sqrt(x) log(x) exp(x) mod(a,b) greatest(a,b,...) least(a,b,...) |
Aliases: substr=slice, length=len, lpad=pad_left, rpad=pad_right, min=least, max=greatest.
Recipes
Built-in named pipelines for common tasks. Use by name:
result = tf.pipeline('preview').run(input_file='data.csv')
result = tf.pipeline('freq').run(input_file='data.csv')
| Recipe | Pipeline | Description |
|---|---|---|
profile |
csv | stats | csv |
Full data profiling |
preview |
csv | head 10 | csv |
First 10 rows |
schema |
csv | head 0 | csv |
Column names only |
summary |
csv | stats count,min,max,avg,stddev | csv |
Summary statistics |
count |
csv | stats count | csv |
Row count |
cardinality |
csv | stats count,distinct | csv |
Unique value counts |
distro |
csv | stats min,p25,median,p75,max | csv |
Five-number summary |
freq |
csv | frequency | csv |
Value frequency |
dedup |
csv | dedup | csv |
Remove duplicates |
clean |
csv | trim | csv |
Trim whitespace |
sample |
csv | sample 100 | csv |
Random 100 rows |
head |
csv | head 20 | csv |
First 20 rows |
tail |
csv | tail 20 | csv |
Last 20 rows |
csv2json |
csv | jsonl |
CSV to JSONL |
json2csv |
jsonl | csv |
JSONL to CSV |
tsv2csv |
csv delimiter="\t" | csv |
TSV to CSV |
csv2tsv |
csv | csv delimiter="\t" |
CSV to TSV |
look |
csv | table |
Pretty-print table |
histogram |
csv | stats hist | csv |
Distribution histograms |
hash |
csv | hash | csv |
Row hash for change detection |
samples |
csv | stats sample | csv |
Sample values per column |
List all recipes programmatically:
for r in tf.recipes():
print(f"{r['name']:15} {r['description']}")
DuckDB engine
Run pipelines on DuckDB instead of the native C streaming core. The DSL is transpiled to SQL in C, then executed by DuckDB.
pip install tranfi[duckdb]
# Run a pipeline via DuckDB
result = tf.pipeline('csv | filter "age > 25" | sort -age | csv', engine='duckdb')
result.run(input_file='data.csv')
# Or with bytes input
result = tf.pipeline('csv | head 10 | csv', engine='duckdb').run(input=csv_bytes)
SQL transpilation
Generate SQL directly from DSL strings:
sql = tf.compile_to_sql('csv | filter "col(age) > 25" | sort -age | head 10 | csv')
print(sql)
# WITH
# step_1 AS (SELECT * FROM input_data WHERE ("age" > 25)),
# step_2 AS (SELECT * FROM step_1 ORDER BY "age" DESC LIMIT 10)
# SELECT * FROM step_2
Use this to run queries with your own DuckDB connection, or any SQL engine that supports CTE syntax.
Advanced
DSL compilation
# Compile DSL to JSON plan
json_plan = tf.compile_dsl('csv | filter "col(age) > 25" | sort -age | csv')
# Save / load recipes
tf.save_recipe([tf.codec.csv(), tf.ops.head(10), tf.codec.csv_encode()], 'preview.tranfi')
p = tf.load_recipe('preview.tranfi')
result = p.run(input_file='data.csv')
Side channels
Every pipeline produces four output channels:
- output -- main pipeline result
- errors -- rows that failed processing
- stats -- pipeline execution statistics (rows in/out, timing)
- samples -- reserved for sampling operators
result = p.run(input_file='data.csv')
print(result.stats_text) # {"rows_in": 1000, "rows_out": 42, ...}
Pipeline from JSON
p = tf.pipeline(recipe='{"steps":[{"op":"codec.csv.decode","args":{}},{"op":"head","args":{"n":5}},{"op":"codec.csv.encode","args":{}}]}')
Architecture
The Python package is a thin ctypes wrapper around libtranfi.so, the same C11 core used by the CLI, Node.js, and WASM targets. Data flows through columnar batches with typed columns (bool, int64, float64, string, date, timestamp) and per-cell null bitmaps. All operators are streaming with bounded memory, except those that require full input (sort, unique, stats, tail, top, group-agg, frequency, pivot).