retype

package
v1.47.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 28, 2026 License: BSD-3-Clause Imports: 9 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var RuleRegistry = map[RuleID]*RetypeRule{
	RuleInt96ToTimestamp: {
		Name: "int96-to-timestamp",
		MatchSchema: func(node, parent *pschema.SchemaNode) bool {
			return node.Type != nil && *node.Type == parquet.Type_INT96
		},
		TransformSchema: func(node *pschema.SchemaNode) {
			node.Type = new(parquet.Type_INT64)
			node.LogicalType = &parquet.LogicalType{
				TIMESTAMP: &parquet.TimestampType{
					IsAdjustedToUTC: true,
					Unit: &parquet.TimeUnit{
						NANOS: &parquet.NanoSeconds{},
					},
				},
			}
			node.ConvertedType = nil
		},
		ConvertData: func(value any) (any, error) {
			s, ok := value.(string)
			if !ok {
				return nil, fmt.Errorf("expected string for INT96, got %T", value)
			}
			return int96ToNanos(s)
		},
		TargetType: reflect.TypeFor[int64](),
		InputKind:  reflect.String,
	},
	RuleBsonToString: {
		Name: "bson-to-string",
		MatchSchema: func(node, parent *pschema.SchemaNode) bool {
			return node.LogicalType != nil && node.LogicalType.IsSetBSON()
		},
		TransformSchema: func(node *pschema.SchemaNode) {

			node.LogicalType = &parquet.LogicalType{
				STRING: &parquet.StringType{},
			}
			node.ConvertedType = nil
		},
		ConvertData: func(value any) (any, error) {
			s, ok := value.(string)
			if !ok {
				return nil, fmt.Errorf("expected string for BSON, got %T", value)
			}
			return bsonToJSONString(s)
		},
		TargetType: nil,
		InputKind:  reflect.String,
	},
	RuleJsonToString: {
		Name: "json-to-string",
		MatchSchema: func(node, parent *pschema.SchemaNode) bool {
			return node.LogicalType != nil && node.LogicalType.IsSetJSON()
		},
		TransformSchema: func(node *pschema.SchemaNode) {

			node.LogicalType = &parquet.LogicalType{
				STRING: &parquet.StringType{},
			}
			node.ConvertedType = nil
		},
		ConvertData: nil,
		TargetType:  nil,
	},
	RuleFloat16ToFloat32: {
		Name: "float16-to-float32",
		MatchSchema: func(node, parent *pschema.SchemaNode) bool {
			return node.LogicalType != nil && node.LogicalType.IsSetFLOAT16()
		},
		TransformSchema: func(node *pschema.SchemaNode) {
			node.Type = new(parquet.Type_FLOAT)
			node.LogicalType = nil
			node.TypeLength = nil
		},
		ConvertData: func(value any) (any, error) {
			s, ok := value.(string)
			if !ok {
				return nil, fmt.Errorf("expected string for FLOAT16, got %T", value)
			}
			if len(s) != 2 {
				return nil, fmt.Errorf("float16 requires 2 bytes, got %d", len(s))
			}
			return types.ConvertFloat16LogicalValue(s), nil
		},
		TargetType: reflect.TypeFor[float32](),
		InputKind:  reflect.String,
	},
	RuleVariantToString: {
		Name: "variant-to-string",
		MatchSchema: func(node, parent *pschema.SchemaNode) bool {
			return node.LogicalType != nil && node.LogicalType.IsSetVARIANT()
		},
		TransformSchema: func(node *pschema.SchemaNode) {

			node.Type = new(parquet.Type_BYTE_ARRAY)
			node.LogicalType = &parquet.LogicalType{
				STRING: &parquet.StringType{},
			}
			node.ConvertedType = new(parquet.ConvertedType_UTF8)
		},
		ConvertData: func(value any) (any, error) {
			jsonData, err := json.Marshal(value)
			if err != nil {
				return nil, fmt.Errorf("failed to marshal VARIANT to JSON: %w", err)
			}
			return string(jsonData), nil
		},
		TargetType: reflect.TypeFor[string](),
	},
	RuleUuidToString: {
		Name: "uuid-to-string",
		MatchSchema: func(node, parent *pschema.SchemaNode) bool {
			return node.LogicalType != nil && node.LogicalType.IsSetUUID()
		},
		TransformSchema: func(node *pschema.SchemaNode) {

			node.Type = new(parquet.Type_BYTE_ARRAY)
			node.LogicalType = &parquet.LogicalType{
				STRING: &parquet.StringType{},
			}
			node.ConvertedType = new(parquet.ConvertedType_UTF8)
			node.TypeLength = nil
		},
		ConvertData: func(value any) (any, error) {
			s, ok := value.(string)
			if !ok {
				return nil, fmt.Errorf("expected string for UUID, got %T", value)
			}
			if len(s) != 16 {
				return nil, fmt.Errorf("UUID requires 16 bytes, got %d", len(s))
			}
			b := []byte(s)
			return fmt.Sprintf("%x-%x-%x-%x-%x", b[0:4], b[4:6], b[6:8], b[8:10], b[10:16]), nil
		},
		TargetType: reflect.TypeFor[string](),
		InputKind:  reflect.String,
	},
	RuleRepeatedToList: {
		Name: "repeated-to-list",
		MatchSchema: func(node, parent *pschema.SchemaNode) bool {

			if parent != nil {
				if parent.LogicalType != nil && (parent.LogicalType.IsSetMAP() || parent.LogicalType.IsSetLIST()) {
					return false
				}
				if parent.ConvertedType != nil && (*parent.ConvertedType == parquet.ConvertedType_MAP || *parent.ConvertedType == parquet.ConvertedType_LIST) {
					return false
				}
			}

			return node.RepetitionType != nil && *node.RepetitionType == parquet.FieldRepetitionType_REPEATED &&
				(node.ConvertedType == nil || *node.ConvertedType != parquet.ConvertedType_LIST) &&
				(node.LogicalType == nil || !node.LogicalType.IsSetLIST()) &&
				(node.LogicalType == nil || !node.LogicalType.IsSetMAP())
		},
		TransformSchema: func(node *pschema.SchemaNode) {

			inPath := node.InNamePath[:len(node.InNamePath):len(node.InNamePath)]
			exPath := node.ExNamePath[:len(node.ExNamePath):len(node.ExNamePath)]

			element := &pschema.SchemaNode{
				SchemaElement: parquet.SchemaElement{
					Name:           "element",
					Type:           node.Type,
					TypeLength:     node.TypeLength,
					RepetitionType: new(parquet.FieldRepetitionType_REQUIRED),
					ConvertedType:  node.ConvertedType,
					Scale:          node.Scale,
					Precision:      node.Precision,
					FieldID:        nil,
					LogicalType:    node.LogicalType,
				},

				InNamePath:       append(inPath, "List", "Element"),
				ExNamePath:       append(exPath, "list", "element"),
				Encoding:         node.Encoding,
				CompressionCodec: node.CompressionCodec,
				OmitStats:        node.OmitStats,
			}

			list := &pschema.SchemaNode{
				SchemaElement: parquet.SchemaElement{
					Name:           "list",
					RepetitionType: new(parquet.FieldRepetitionType_REPEATED),
				},
				Children:   []*pschema.SchemaNode{element},
				InNamePath: append(inPath, "List"),
				ExNamePath: append(exPath, "list"),
			}

			node.Type = nil
			node.TypeLength = nil
			node.RepetitionType = new(parquet.FieldRepetitionType_REQUIRED)
			node.ConvertedType = new(parquet.ConvertedType_LIST)
			node.LogicalType = &parquet.LogicalType{LIST: &parquet.ListType{}}
			node.Children = []*pschema.SchemaNode{list}

			node.Scale = nil
			node.Precision = nil
			node.FieldID = nil
			node.Encoding = ""
			node.CompressionCodec = ""
			node.OmitStats = ""
		},

		ConvertData: func(value any) (any, error) {
			val := reflect.ValueOf(value)
			if val.Kind() != reflect.Slice {
				return nil, fmt.Errorf("expected slice for repeated field, got %T", value)
			}
			if val.IsNil() {
				return nil, nil
			}

			list := make([]listElementWrapper, val.Len())
			for i := range val.Len() {
				list[i] = listElementWrapper{
					Element: val.Index(i).Interface(),
				}
			}

			return listWrapper{
				List: list,
			}, nil
		},
		TargetType: reflect.TypeFor[listWrapper](),
	},
	RuleGeoToBinary: {
		Name: "geo-to-binary",

		MatchSchema: func(node, parent *pschema.SchemaNode) bool {
			return node.LogicalType != nil && (node.LogicalType.IsSetGEOMETRY() || node.LogicalType.IsSetGEOGRAPHY())
		},
		TransformSchema: func(node *pschema.SchemaNode) {
			node.LogicalType = nil
			node.ConvertedType = nil
		},
		ConvertData: nil,
		TargetType:  nil,
	},
}

RuleRegistry contains all available retype rules indexed by name.

Functions

This section is empty.

Types

type Cmd

type Cmd struct {
	Int96ToTimestamp bool   `name:"int96-to-timestamp" help:"Convert INT96 columns to TIMESTAMP_NANOS." default:"false"`
	BsonToString     bool   `name:"bson-to-string" help:"Convert BSON columns to plain strings (JSON encoded)." default:"false"`
	JsonToString     bool   `name:"json-to-string" help:"Remove JSON logical type from columns." default:"false"`
	Float16ToFloat32 bool   `name:"float16-to-float32" help:"Convert FLOAT16 columns to FLOAT32." default:"false"`
	VariantToString  bool   `name:"variant-to-string" help:"Convert VARIANT columns to plain strings (JSON encoded)." default:"false"`
	UuidToString     bool   `name:"uuid-to-string" help:"Convert UUID columns to plain strings." default:"false"`
	RepeatedToList   bool   `name:"repeated-to-list" help:"Convert legacy repeated primitive columns to LIST format." default:"false"`
	GeoToBinary      bool   `name:"geo-to-binary" help:"Remove GEOGRAPHY and GEOMETRY logical types (keep as plain BYTE_ARRAY)." default:"false"`
	ReadPageSize     int    `help:"Page size to read from Parquet." default:"1000"`
	Source           string `short:"s" help:"Source Parquet file to retype." required:"true"`
	URI              string `arg:"" predictor:"file" help:"URI of output Parquet file."`
	pio.ReadOption
	pio.WriteOption
}

Cmd is a kong command for retype

func (Cmd) Run

func (c Cmd) Run() (retErr error)

Run does actual retype job

type Converter

type Converter struct {
	// contains filtered or unexported fields
}

Converter handles data conversion for multiple rules.

func NewConverter

func NewConverter(rules []*RetypeRule, matchedFields []map[string]struct{}) *Converter

NewConverter creates a converter for the given rules and their matched fields.

func (*Converter) Convert

func (c *Converter) Convert(row any) (any, error)

Convert transforms a row according to all active rules.

type FieldConverter

type FieldConverter struct {
	Rule   *RetypeRule
	Fields map[string]struct{}
}

FieldConverter pairs a rule with its matched fields.

type RetypeRule

type RetypeRule struct {
	// Name is the identifier for this rule (e.g., "int96-to-timestamp")
	Name string

	// MatchSchema returns true if the schema node should be transformed by this rule
	MatchSchema func(node, parent *pschema.SchemaNode) bool

	// TransformSchema modifies the schema node in place
	TransformSchema func(*pschema.SchemaNode)

	// ConvertData converts a field value. Returns the converted value.
	// If nil, no data conversion is performed (schema-only change).
	ConvertData func(value any) (any, error)

	// TargetType is the Go type for converted fields.
	// If nil, the type remains unchanged (string to string conversions).
	TargetType reflect.Type

	// InputKind enforces the input kind for the rule.
	// If set to reflect.Invalid (default), any input kind is accepted.
	// Used to filter out false positive name matches in nested structures.
	InputKind reflect.Kind
}

RetypeRule defines a transformation rule for converting Parquet column types. Each rule specifies how to match columns, transform their schema, and optionally convert data.

type RuleID

type RuleID int

RuleID identifies a retype rule.

const (
	// RuleInt96ToTimestamp converts INT96 columns to TIMESTAMP_NANOS.
	RuleInt96ToTimestamp RuleID = iota
	// RuleBsonToString converts BSON columns to plain strings.
	RuleBsonToString
	// RuleJsonToString removes JSON logical type from columns.
	RuleJsonToString
	// RuleFloat16ToFloat32 converts FLOAT16 columns to FLOAT32.
	RuleFloat16ToFloat32
	// RuleVariantToString converts VARIANT columns to plain strings.
	RuleVariantToString
	// RuleUuidToString converts UUID columns to plain strings.
	RuleUuidToString
	// RuleRepeatedToList converts legacy repeated primitives to LIST format.
	RuleRepeatedToList
	// RuleGeoToBinary removes GEOGRAPHY and GEOMETRY logical types.
	RuleGeoToBinary
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL