跳至主要內容

串流指南

TTOON 在兩種格式 (T-TOON, T-JSON) 和兩條路徑 (物件, Arrow) 之間提供了 8 種串流讀寫器組合。所有串流操作都需要定義欄位名稱和型別的 StreamSchema

總覽

T-TOON 物件T-TOON ArrowT-JSON 物件T-JSON Arrow
讀取器 (Reader)StreamReaderArrowStreamReaderTjsonStreamReaderTjsonArrowStreamReader
寫入器 (Writer)StreamWriterArrowStreamWriterTjsonStreamWriterTjsonArrowStreamWriter

Schema 定義

所有的串流操作都始於 StreamSchema

Python

from ttoon import StreamSchema, types

schema = StreamSchema({
"name": types.string,
"score": types.int,
"amount": types.decimal(10, 2),
"active": types.bool.nullable(),
})

JavaScript / TypeScript

import { StreamSchema, types } from '@ttoon/shared';

const schema = new StreamSchema({
name: types.string,
score: types.int,
amount: types.decimal(10, 2),
active: types.bool.nullable(),
});

Rust

use ttoon_core::{StreamSchema, FieldType, ScalarType};

let schema = StreamSchema::new([
("name", FieldType::new(ScalarType::String)),
("score", FieldType::new(ScalarType::Int)),
("amount", FieldType::new(ScalarType::Decimal { precision: 10, scale: 2 })),
("active", FieldType::nullable(ScalarType::Bool)),
]);

可用型別

型別規格PythonJavaScript描述
字串 (String)types.stringtypes.string字串
整數 (Int)types.inttypes.int整數
浮點數 (Float)types.floattypes.float浮點數
布林 (Bool)types.booltypes.bool布林值
日期 (Date)types.datetypes.date日期
時間 (Time)types.timetypes.time時間
日期時間 (DateTime)types.datetimetypes.datetime帶時區的日期時間
無時區日期時間 (DateTime Naive)types.datetime_naivetypes.datetimeNaive不帶時區的日期時間
UUIDtypes.uuidtypes.uuidUUID
二進位 (Binary)types.binarytypes.binary二進位
十進位 (Decimal)types.decimal(p, s)types.decimal(p, s)Decimal(精度 precision, 小數 scale)

所有型別都支援 .nullable() 來允許 null 值。

T-TOON 串流

寫入

T-TOON 串流使用 [*]{fields}: 作為標頭 — * 表示一個無界限 (unbounded) 的串流 (相對於固定行數批次處理的 [N])。

Python

import ttoon
from ttoon import StreamSchema, types

schema = StreamSchema({"name": types.string, "score": types.int})

with ttoon.stream_writer(open("out.ttoon", "w"), schema=schema) as writer:
writer.write({"name": "Alice", "score": 95})
writer.write({"name": "Bob", "score": 87})

print(writer.result.rows_emitted) # 2

輸出:

[*]{name,score}:
"Alice", 95
"Bob", 87

JavaScript / TypeScript

import { streamWriter, StreamSchema, types } from '@ttoon/shared';

const schema = new StreamSchema({ name: types.string, score: types.int });
const chunks: string[] = [];

const writer = streamWriter((chunk) => chunks.push(chunk), { schema });
writer.write({ name: 'Alice', score: 95 });
writer.write({ name: 'Bob', score: 87 });
const result = await writer.close();
console.log(result.rowsEmitted); // 2

讀取

Python

for row in ttoon.stream_read(open("data.ttoon"), schema=schema):
print(row) # {"name": "Alice", "score": 95}

JavaScript / TypeScript

import { streamRead, StreamSchema, types } from '@ttoon/shared';

const schema = new StreamSchema({ name: types.string, score: types.int });

for await (const row of streamRead(source, { schema })) {
console.log(row); // { name: "Alice", score: 95 }
}

JS 讀取器接受 TextSourcestring, Iterable<string | Uint8Array>, AsyncIterable<string | Uint8Array>, 或是 ReadableStreamLike<string | Uint8Array>

T-JSON 串流

T-JSON 串流使用最頂層的 JSON 物件陣列格式。

寫入

Python

with ttoon.stream_writer_tjson(sink, schema=schema) as writer:
writer.write({"name": "Alice", "score": 95})
writer.write({"name": "Bob", "score": 87})

輸出:

[{"name": "Alice", "score": 95}
,{"name": "Bob", "score": 87}
]

JavaScript / TypeScript

import { streamWriterTjson, StreamSchema, types } from '@ttoon/shared';

const writer = streamWriterTjson(sink, { schema });
writer.write({ name: 'Alice', score: 95 });
await writer.close();

讀取

Python

for row in ttoon.stream_read_tjson(source, schema=schema):
print(row)

對 T-JSON 串流 reader 而言,mode 只影響 schema 外欄位的處理方式,不會放寬 JSON 值語法本身。

JavaScript / TypeScript

for await (const row of streamReadTjson(source, { schema })) {
console.log(row);
}

Arrow 串流

Arrow 串流讀取器會產生 (yield) RecordBatch 物件;寫入器則接受 RecordBatch 物件。

寫入

Python

with ttoon.stream_writer_arrow(sink, schema=schema) as writer:
writer.write_batch(record_batch)

# T-JSON 變體
with ttoon.stream_writer_arrow_tjson(sink, schema=schema) as writer:
writer.write_batch(record_batch)

JavaScript / TypeScript

import { streamWriterArrow, StreamSchema, types } from '@ttoon/shared';

const writer = streamWriterArrow(sink, { schema });
writer.writeBatch(recordBatch);
await writer.close();

讀取

Python

for batch in ttoon.stream_read_arrow(source, schema=schema, batch_size=1024):
print(batch) # pyarrow.RecordBatch

JavaScript / TypeScript

for await (const batch of streamReadArrow(source, { schema, batchSize: 1024 })) {
console.log(batch); // RecordBatch
}

選項

寫入器選項

選項T-TOON 寫入器T-JSON 寫入器說明
schema必要必要欄位定義
delimiter",", "\t", `"
binary_format / binaryFormat"hex" 或是 "b64"
codecs僅物件寫入器僅物件寫入器覆寫編解碼器

讀取器選項

選項所有讀取器Arrow 讀取器說明
schema必要必要欄位定義
mode"compat" 或是 "strict";對 T-JSON 串流來說,主要控制 schema 外欄位的處理方式
codecs僅物件讀取器覆寫編解碼器
batch_size / batchSize每個 Arrow 批次的行數 (預設 1024)

JS 來源/接收器的靈活性

JS 串流接受多種 source 和 sink 的型別:

TextSource: string | Iterable<string | Uint8Array> | AsyncIterable<string | Uint8Array> | ReadableStreamLike<string | Uint8Array>

TextSink: (chunk: string) => void | Promise<void> | { write(chunk: string): void | Promise<void> } | WritableStreamLike<string>

這代表您可以使用 callback、Node.js stream、Web Stream 或任何具有 .write() 方法的物件。

StreamResult

所有寫入器在關閉時都會回傳一個 StreamResult

語言存取方式屬性
Pythonwriter.result 或是 writer.close()rows_emitted: int
JSwriter.result 或是 await writer.close()rowsEmitted: number
Rustwriter.close()rows_emitted: usize