Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
View Source
var RuleRegistry = map[RuleID]*RetypeRule{ RuleInt96ToTimestamp: { Name: "int96-to-timestamp", MatchSchema: func(node, parent *pschema.SchemaNode) bool { return node.Type != nil && *node.Type == parquet.Type_INT96 }, TransformSchema: func(node *pschema.SchemaNode) { node.Type = new(parquet.Type_INT64) node.LogicalType = &parquet.LogicalType{ TIMESTAMP: &parquet.TimestampType{ IsAdjustedToUTC: true, Unit: &parquet.TimeUnit{ NANOS: &parquet.NanoSeconds{}, }, }, } node.ConvertedType = nil }, ConvertData: func(value any) (any, error) { s, ok := value.(string) if !ok { return nil, fmt.Errorf("expected string for INT96, got %T", value) } return int96ToNanos(s) }, TargetType: reflect.TypeFor[int64](), InputKind: reflect.String, }, RuleBsonToString: { Name: "bson-to-string", MatchSchema: func(node, parent *pschema.SchemaNode) bool { return node.LogicalType != nil && node.LogicalType.IsSetBSON() }, TransformSchema: func(node *pschema.SchemaNode) { node.LogicalType = &parquet.LogicalType{ STRING: &parquet.StringType{}, } node.ConvertedType = nil }, ConvertData: func(value any) (any, error) { s, ok := value.(string) if !ok { return nil, fmt.Errorf("expected string for BSON, got %T", value) } return bsonToJSONString(s) }, TargetType: nil, InputKind: reflect.String, }, RuleJsonToString: { Name: "json-to-string", MatchSchema: func(node, parent *pschema.SchemaNode) bool { return node.LogicalType != nil && node.LogicalType.IsSetJSON() }, TransformSchema: func(node *pschema.SchemaNode) { node.LogicalType = &parquet.LogicalType{ STRING: &parquet.StringType{}, } node.ConvertedType = nil }, ConvertData: nil, TargetType: nil, }, RuleFloat16ToFloat32: { Name: "float16-to-float32", MatchSchema: func(node, parent *pschema.SchemaNode) bool { return node.LogicalType != nil && node.LogicalType.IsSetFLOAT16() }, TransformSchema: func(node *pschema.SchemaNode) { node.Type = new(parquet.Type_FLOAT) node.LogicalType = nil node.TypeLength = nil }, ConvertData: func(value any) (any, error) { s, ok := value.(string) if !ok { return nil, fmt.Errorf("expected string for FLOAT16, got %T", value) } if len(s) != 2 { return nil, fmt.Errorf("float16 requires 2 bytes, got %d", len(s)) } return types.ConvertFloat16LogicalValue(s), nil }, TargetType: reflect.TypeFor[float32](), InputKind: reflect.String, }, RuleVariantToString: { Name: "variant-to-string", MatchSchema: func(node, parent *pschema.SchemaNode) bool { return node.LogicalType != nil && node.LogicalType.IsSetVARIANT() }, TransformSchema: func(node *pschema.SchemaNode) { node.Type = new(parquet.Type_BYTE_ARRAY) node.LogicalType = &parquet.LogicalType{ STRING: &parquet.StringType{}, } node.ConvertedType = new(parquet.ConvertedType_UTF8) }, ConvertData: func(value any) (any, error) { jsonData, err := json.Marshal(value) if err != nil { return nil, fmt.Errorf("failed to marshal VARIANT to JSON: %w", err) } return string(jsonData), nil }, TargetType: reflect.TypeFor[string](), }, RuleUuidToString: { Name: "uuid-to-string", MatchSchema: func(node, parent *pschema.SchemaNode) bool { return node.LogicalType != nil && node.LogicalType.IsSetUUID() }, TransformSchema: func(node *pschema.SchemaNode) { node.Type = new(parquet.Type_BYTE_ARRAY) node.LogicalType = &parquet.LogicalType{ STRING: &parquet.StringType{}, } node.ConvertedType = new(parquet.ConvertedType_UTF8) node.TypeLength = nil }, ConvertData: func(value any) (any, error) { s, ok := value.(string) if !ok { return nil, fmt.Errorf("expected string for UUID, got %T", value) } if len(s) != 16 { return nil, fmt.Errorf("UUID requires 16 bytes, got %d", len(s)) } b := []byte(s) return fmt.Sprintf("%x-%x-%x-%x-%x", b[0:4], b[4:6], b[6:8], b[8:10], b[10:16]), nil }, TargetType: reflect.TypeFor[string](), InputKind: reflect.String, }, RuleRepeatedToList: { Name: "repeated-to-list", MatchSchema: func(node, parent *pschema.SchemaNode) bool { if parent != nil { if parent.LogicalType != nil && (parent.LogicalType.IsSetMAP() || parent.LogicalType.IsSetLIST()) { return false } if parent.ConvertedType != nil && (*parent.ConvertedType == parquet.ConvertedType_MAP || *parent.ConvertedType == parquet.ConvertedType_LIST) { return false } } return node.RepetitionType != nil && *node.RepetitionType == parquet.FieldRepetitionType_REPEATED && (node.ConvertedType == nil || *node.ConvertedType != parquet.ConvertedType_LIST) && (node.LogicalType == nil || !node.LogicalType.IsSetLIST()) && (node.LogicalType == nil || !node.LogicalType.IsSetMAP()) }, TransformSchema: func(node *pschema.SchemaNode) { inPath := node.InNamePath[:len(node.InNamePath):len(node.InNamePath)] exPath := node.ExNamePath[:len(node.ExNamePath):len(node.ExNamePath)] element := &pschema.SchemaNode{ SchemaElement: parquet.SchemaElement{ Name: "element", Type: node.Type, TypeLength: node.TypeLength, RepetitionType: new(parquet.FieldRepetitionType_REQUIRED), ConvertedType: node.ConvertedType, Scale: node.Scale, Precision: node.Precision, FieldID: nil, LogicalType: node.LogicalType, }, InNamePath: append(inPath, "List", "Element"), ExNamePath: append(exPath, "list", "element"), Encoding: node.Encoding, CompressionCodec: node.CompressionCodec, OmitStats: node.OmitStats, } list := &pschema.SchemaNode{ SchemaElement: parquet.SchemaElement{ Name: "list", RepetitionType: new(parquet.FieldRepetitionType_REPEATED), }, Children: []*pschema.SchemaNode{element}, InNamePath: append(inPath, "List"), ExNamePath: append(exPath, "list"), } node.Type = nil node.TypeLength = nil node.RepetitionType = new(parquet.FieldRepetitionType_REQUIRED) node.ConvertedType = new(parquet.ConvertedType_LIST) node.LogicalType = &parquet.LogicalType{LIST: &parquet.ListType{}} node.Children = []*pschema.SchemaNode{list} node.Scale = nil node.Precision = nil node.FieldID = nil node.Encoding = "" node.CompressionCodec = "" node.OmitStats = "" }, ConvertData: func(value any) (any, error) { val := reflect.ValueOf(value) if val.Kind() != reflect.Slice { return nil, fmt.Errorf("expected slice for repeated field, got %T", value) } if val.IsNil() { return nil, nil } list := make([]listElementWrapper, val.Len()) for i := range val.Len() { list[i] = listElementWrapper{ Element: val.Index(i).Interface(), } } return listWrapper{ List: list, }, nil }, TargetType: reflect.TypeFor[listWrapper](), }, RuleGeoToBinary: { Name: "geo-to-binary", MatchSchema: func(node, parent *pschema.SchemaNode) bool { return node.LogicalType != nil && (node.LogicalType.IsSetGEOMETRY() || node.LogicalType.IsSetGEOGRAPHY()) }, TransformSchema: func(node *pschema.SchemaNode) { node.LogicalType = nil node.ConvertedType = nil }, ConvertData: nil, TargetType: nil, }, }
RuleRegistry contains all available retype rules indexed by name.
Functions ¶
This section is empty.
Types ¶
type Cmd ¶
type Cmd struct {
Int96ToTimestamp bool `name:"int96-to-timestamp" help:"Convert INT96 columns to TIMESTAMP_NANOS." default:"false"`
BsonToString bool `name:"bson-to-string" help:"Convert BSON columns to plain strings (JSON encoded)." default:"false"`
JsonToString bool `name:"json-to-string" help:"Remove JSON logical type from columns." default:"false"`
Float16ToFloat32 bool `name:"float16-to-float32" help:"Convert FLOAT16 columns to FLOAT32." default:"false"`
VariantToString bool `name:"variant-to-string" help:"Convert VARIANT columns to plain strings (JSON encoded)." default:"false"`
UuidToString bool `name:"uuid-to-string" help:"Convert UUID columns to plain strings." default:"false"`
RepeatedToList bool `name:"repeated-to-list" help:"Convert legacy repeated primitive columns to LIST format." default:"false"`
GeoToBinary bool `name:"geo-to-binary" help:"Remove GEOGRAPHY and GEOMETRY logical types (keep as plain BYTE_ARRAY)." default:"false"`
ReadPageSize int `help:"Page size to read from Parquet." default:"1000"`
Source string `short:"s" help:"Source Parquet file to retype." required:"true"`
URI string `arg:"" predictor:"file" help:"URI of output Parquet file."`
pio.ReadOption
pio.WriteOption
}
Cmd is a kong command for retype
type Converter ¶
type Converter struct {
// contains filtered or unexported fields
}
Converter handles data conversion for multiple rules.
func NewConverter ¶
func NewConverter(rules []*RetypeRule, matchedFields []map[string]struct{}) *Converter
NewConverter creates a converter for the given rules and their matched fields.
type FieldConverter ¶
type FieldConverter struct {
Rule *RetypeRule
Fields map[string]struct{}
}
FieldConverter pairs a rule with its matched fields.
type RetypeRule ¶
type RetypeRule struct {
// Name is the identifier for this rule (e.g., "int96-to-timestamp")
Name string
// MatchSchema returns true if the schema node should be transformed by this rule
MatchSchema func(node, parent *pschema.SchemaNode) bool
// TransformSchema modifies the schema node in place
TransformSchema func(*pschema.SchemaNode)
// ConvertData converts a field value. Returns the converted value.
// If nil, no data conversion is performed (schema-only change).
ConvertData func(value any) (any, error)
// TargetType is the Go type for converted fields.
// If nil, the type remains unchanged (string to string conversions).
TargetType reflect.Type
// InputKind enforces the input kind for the rule.
// If set to reflect.Invalid (default), any input kind is accepted.
// Used to filter out false positive name matches in nested structures.
InputKind reflect.Kind
}
RetypeRule defines a transformation rule for converting Parquet column types. Each rule specifies how to match columns, transform their schema, and optionally convert data.
type RuleID ¶
type RuleID int
RuleID identifies a retype rule.
const ( // RuleInt96ToTimestamp converts INT96 columns to TIMESTAMP_NANOS. RuleInt96ToTimestamp RuleID = iota // RuleBsonToString converts BSON columns to plain strings. RuleBsonToString // RuleJsonToString removes JSON logical type from columns. RuleJsonToString // RuleFloat16ToFloat32 converts FLOAT16 columns to FLOAT32. RuleFloat16ToFloat32 // RuleVariantToString converts VARIANT columns to plain strings. RuleVariantToString // RuleUuidToString converts UUID columns to plain strings. RuleUuidToString // RuleRepeatedToList converts legacy repeated primitives to LIST format. RuleRepeatedToList // RuleGeoToBinary removes GEOGRAPHY and GEOMETRY logical types. RuleGeoToBinary )
Click to show internal directories.
Click to hide internal directories.