跳至主要內容

Stream API

本頁整理 T-TOONT-JSON 的逐行串流 API。可透過上方語言 tabs 在 Python、JavaScript 與 Rust 之間切換。

所有 streaming API 都需要 StreamSchema

共通格式慣例:

  • T-TOON stream[*]{fields}:
  • T-JSON stream:最外層為物件陣列
  • Object path:以語言原生物件逐列處理
  • Arrow path:以 Arrow batch 逐批處理

套件:ttoon

Reader Factories

函數回傳型別格式路徑
stream_read(source, *, schema, mode=None, codecs=None)StreamReaderT-TOONObject
stream_read_tjson(source, *, schema, mode=None, codecs=None)TjsonStreamReaderT-JSONObject
stream_read_arrow(source, *, schema, batch_size=1024, mode=None)ArrowStreamReaderT-TOONArrow
stream_read_arrow_tjson(source, *, schema, batch_size=1024, mode=None)TjsonArrowStreamReaderT-JSONArrow

所有 reader 都是 Python iterator:

for row in reader:
print(row)

對 T-JSON 串流 reader 而言,mode 不會放寬 JSON 值語法;它只控制 schema 外欄位的處理方式:compat 會略過,strict 會報錯。

Writer Factories

函數回傳型別格式路徑
stream_writer(sink, *, schema, delimiter=",", binary_format=None, codecs=None)StreamWriterT-TOONObject
stream_writer_tjson(sink, *, schema, binary_format=None, codecs=None)TjsonStreamWriterT-JSONObject
stream_writer_arrow(sink, *, schema, delimiter=",", binary_format=None)ArrowStreamWriterT-TOONArrow
stream_writer_arrow_tjson(sink, *, schema, binary_format=None)TjsonArrowStreamWriterT-JSONArrow

所有 writer 都支援 context manager:

with stream_writer(sink, schema=schema) as writer:
writer.write({"name": "Alice", "score": 95})
result = writer.result

Writer Methods 與結果

類別寫入方法備註
StreamWriterwrite(row: Mapping)物件列
TjsonStreamWriterwrite(row: Mapping)物件列
ArrowStreamWriterwrite_batch(batch)Arrow RecordBatch
TjsonArrowStreamWriterwrite_batch(batch)Arrow RecordBatch

StreamResult

屬性型別描述
rows_emittedint已寫入列數

Codec 作用範圍

use(codecs) -> None

為 Python object-path streaming APIs 註冊全域 codec。

Codec 會影響:

  • stream_read() / stream_writer()
  • stream_read_tjson() / stream_writer_tjson()

不影響批次 loads()、批次 to_tjson()、Arrow-path streaming 與 direct transcode。

Stream Schema

StreamSchema 定義串流操作中的欄位名稱與型別。所有串流讀寫器都需要 schema。

建立方式

from ttoon import StreamSchema, types

# 從 dict 建立
schema = StreamSchema({
"name": types.string,
"score": types.int,
"amount": types.decimal(10, 2),
})

# 從 list of tuples 建立(保留插入順序)
schema = StreamSchema([
("name", types.string),
("score", types.int),
])

Types 命名空間

型別PythonJavaScriptRust
Stringtypes.stringtypes.stringScalarType::String
Inttypes.inttypes.intScalarType::Int
Floattypes.floattypes.floatScalarType::Float
Booltypes.booltypes.boolScalarType::Bool
Datetypes.datetypes.dateScalarType::Date
Timetypes.timetypes.timeScalarType::Time
DateTime(帶時區)types.datetimetypes.datetimeScalarType::DateTime { has_tz: true }
DateTime(無時區)types.datetime_naivetypes.datetimeNaiveScalarType::DateTime { has_tz: false }
UUIDtypes.uuidtypes.uuidScalarType::Uuid
Binarytypes.binarytypes.binaryScalarType::Binary
Decimal(p, s)types.decimal(p, s)types.decimal(p, s)ScalarType::decimal(p, s)ScalarType::Decimal { precision, scale }

Rust 另提供便利建構子 ScalarType::datetime()ScalarType::datetime_naive()

可為 Null 的欄位

所有型別規格都支援 .nullable() 讓欄位可接受 null:

schema = StreamSchema({
"name": types.string, # NOT NULL
"nickname": types.string.nullable(), # nullable
})

Schema 存取

schema["name"]     # 傳回以 ttoon.types 建立的欄位規格
len(schema) # 欄位數量
list(schema) # 欄位名稱列表
schema.export() # 可序列化形式

驗證規則

三種語言都遵循相同的概念規則:

  • schema 至少要有一個欄位
  • 欄位名稱必須是字串
  • 不允許重複欄位名稱
  • 欄位型別必須來自各語言對應的 typed schema surface

各語言錯誤表現:

  • Python:名稱 / 型別不合法時拋 TypeError;重複或空 schema 拋 ValueError
  • JavaScript:名稱 / 型別不合法時拋 TypeError;重複或空 schema 拋 Error
  • Rust:StreamSchema::try_new() 回傳 ResultStreamSchema::new() 在非法輸入時 panic

Decimal 約束

decimal(precision, scale) 會轉交 Rust 後端處理。有效限制如下:

  • precision 必須介於 176
  • scale 必須符合 Rust i8
  • Arrow 轉換時,precision <= 38 會使用 Decimal128,其餘使用 Decimal256

超出範圍的值可能在 Python / JS 包裝層被建構成功,但最終仍會在 Rust 驗證或轉換時失敗。

Arrow Schema 轉換(Rust)

// StreamSchema -> Arrow Schema
let arrow_schema = schema.to_arrow_schema()?;

// Arrow Schema -> StreamSchema
let stream_schema = StreamSchema::from_arrow_schema(&arrow_schema)?;

相關頁面