diff --git a/.golangci.yml b/.golangci.yml index d56c04b..45b8196 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -80,7 +80,7 @@ issues: # Excluding configuration per-path, per-linter, per-text and per-source exclude-rules: # Some code is just exceptional. Review whenever the code is changed. - - path: influxdb3/query_iterator\.go + - path: influxdb3/query_iterator\.go,influxdb3/point_value_iterator\.go linters: - gocyclo # Exclude some linters from running on tests files. diff --git a/CHANGELOG.md b/CHANGELOG.md index d09215e..c9961c9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,11 @@ 1. [#127](https://github.com/InfluxCommunity/influxdb3-go/pull/127): LPBatcher now returns first line of the internal buffer when the line length exceeds the batch size. +### Features + +1. [#131](https://github.com/InfluxCommunity/influxdb3-go/pull/131): Add new PointValueIterator based on google + guidelines [Guidelines](https://github.com/googleapis/google-cloud-go/wiki/Iterator-Guidelines) + ## 2.0.0 [2024-12-13] ### Breaking Changes diff --git a/influxdb3/client_e2e_test.go b/influxdb3/client_e2e_test.go index c18dd2b..cadd7a3 100644 --- a/influxdb3/client_e2e_test.go +++ b/influxdb3/client_e2e_test.go @@ -27,6 +27,7 @@ package influxdb3_test import ( "context" + "errors" "fmt" "log/slog" "math" @@ -239,6 +240,112 @@ func TestQueryWithParameters(t *testing.T) { assert.True(t, iterator.Done()) } +func TestQueryPointValue(t *testing.T) { + SkipCheck(t) + now := time.Now().UTC() + testId := now.UnixNano() + + url := os.Getenv("TESTING_INFLUXDB_URL") + token := os.Getenv("TESTING_INFLUXDB_TOKEN") + database := os.Getenv("TESTING_INFLUXDB_DATABASE") + + client, err := influxdb3.New(influxdb3.ClientConfig{ + Host: url, + Token: token, + Database: database, + }) + require.NoError(t, err) + defer client.Close() + + p := influxdb3.NewPointWithMeasurement("weather5"). + SetField("text", "a1"). + SetField("testId", testId). + SetTimestamp(now) + err = client.WritePoints(context.Background(), []*influxdb3.Point{p}) + require.NoError(t, err) + query := fmt.Sprintf(` + SELECT * + FROM weather5 + WHERE + time >= now() - interval '10 minute' + AND + "testId" = %d + `, testId) + + sleepTime := 5 * time.Second + time.Sleep(sleepTime) + + pointValueIterator, err := client.QueryPointValue(context.Background(), query) + require.NoError(t, err) + require.NotNil(t, pointValueIterator) + + PointValue, err := pointValueIterator.Next() + assert.NoError(t, err) + assert.NotNil(t, PointValue) + assert.Equal(t, PointValue.GetField("text"), "a1") + + PointValue, err = pointValueIterator.Next() + assert.Equal(t, influxdb3.Done, errors.New("no more items in iterator")) + assert.Nil(t, PointValue) +} + +func TestQueryPointValueWithParameters(t *testing.T) { + SkipCheck(t) + now := time.Now().UTC() + testId := now.UnixNano() + + url := os.Getenv("TESTING_INFLUXDB_URL") + token := os.Getenv("TESTING_INFLUXDB_TOKEN") + database := os.Getenv("TESTING_INFLUXDB_DATABASE") + + client, err := influxdb3.New(influxdb3.ClientConfig{ + Host: url, + Token: token, + Database: database, + }) + require.NoError(t, err) + defer client.Close() + + p := influxdb3.NewPointWithMeasurement("weather5"). + SetField("text", "a1"). + SetField("testId", testId). + SetTimestamp(now) + err = client.WritePoints(context.Background(), []*influxdb3.Point{p}) + require.NoError(t, err) + + query := ` + SELECT * + FROM weather5 + WHERE + time >= now() - interval '10 minute' + AND + text = $text + AND + "testId" = $testId + ORDER BY time + ` + parameters := influxdb3.QueryParameters{ + "text": "a1", + "testId": testId, + } + + sleepTime := 5 * time.Second + time.Sleep(sleepTime) + + pointValueIterator, err := client.QueryPointValueWithParameters(context.Background(), query, parameters) + require.NoError(t, err) + require.NotNil(t, pointValueIterator) + + PointValue, err := pointValueIterator.Next() + assert.NoError(t, err) + assert.NotNil(t, PointValue) + assert.Equal(t, PointValue.GetField("text"), "a1") + + PointValue, err = pointValueIterator.Next() + assert.Equal(t, influxdb3.Done, errors.New("no more items in iterator")) + assert.Nil(t, PointValue) +} + func TestQueryDatabaseDoesNotExist(t *testing.T) { SkipCheck(t) url := os.Getenv("TESTING_INFLUXDB_URL") diff --git a/influxdb3/point_value_iterator.go b/influxdb3/point_value_iterator.go new file mode 100644 index 0000000..05ba7d7 --- /dev/null +++ b/influxdb3/point_value_iterator.go @@ -0,0 +1,91 @@ +/* + The MIT License + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in + all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + THE SOFTWARE. +*/ + +package influxdb3 + +import ( + "errors" + + "github.com/apache/arrow/go/v15/arrow" + "github.com/apache/arrow/go/v15/arrow/flight" +) + +var Done = errors.New("no more items in iterator") //nolint:all + +// PointValueIterator is a custom query iterator that encapsulates and simplifies the logic for +// the flight reader. It provides method Next to consume the flight reader, +// +// The PointValueIterator Next function will return response as a *PointValues object representing the current row +type PointValueIterator struct { + reader *flight.Reader + // Index of row of current object in current record + index int + // Current record + record arrow.Record +} + +// Return a new PointValueIterator +func newPointValueIterator(reader *flight.Reader) *PointValueIterator { + return &PointValueIterator{ + reader: reader, + index: -1, + record: nil, + } +} + +// Next returns the next result. +// Its second return value is iterator.Done if there are no more results. +// Once Next returns Done in the second parameter, all subsequent calls will return Done. +// +// it := newPointValueIterator(flightReader) +// for { +// PointValue, err := it.Next() +// if err == iterator.Done { +// break +// } +// if err != nil { +// return err +// } +// process(PointValue) +// } +func (it *PointValueIterator) Next() (*PointValues, error) { + it.index++ + + for it.record == nil || it.index >= int(it.record.NumRows()) { + if !it.reader.Next() { + if err := it.reader.Err(); err != nil { + return nil, err + } + return nil, Done + } + it.record = it.reader.Record() + it.index = 0 + } + + pointValues := rowToPointValue(it.reader.Record(), it.index) + return pointValues, nil +} + +// Index return the current index of PointValueIterator +func (it *PointValueIterator) Index() int { + return it.index +} diff --git a/influxdb3/point_value_iterator_test.go b/influxdb3/point_value_iterator_test.go new file mode 100644 index 0000000..22f66df --- /dev/null +++ b/influxdb3/point_value_iterator_test.go @@ -0,0 +1,230 @@ +/* + The MIT License + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in + all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + THE SOFTWARE. +*/ + +package influxdb3 + +import ( + "bytes" + "errors" + "slices" + "testing" + + "github.com/apache/arrow/go/v15/arrow" + "github.com/apache/arrow/go/v15/arrow/array" + "github.com/apache/arrow/go/v15/arrow/flight" + "github.com/apache/arrow/go/v15/arrow/float16" + "github.com/apache/arrow/go/v15/arrow/ipc" + "github.com/apache/arrow/go/v15/arrow/memory" + "github.com/stretchr/testify/assert" +) + +func TestPointValueIterator(t *testing.T) { + schema := arrow.NewSchema([]arrow.Field{ + {Name: "f0", Type: arrow.PrimitiveTypes.Int64}, + {Name: "f1", Type: arrow.PrimitiveTypes.Uint8}, + {Name: "f2", Type: arrow.PrimitiveTypes.Int8}, + {Name: "f3", Type: arrow.PrimitiveTypes.Uint16}, + {Name: "f4", Type: arrow.PrimitiveTypes.Int16}, + {Name: "f5", Type: arrow.PrimitiveTypes.Uint32}, + {Name: "f6", Type: arrow.BinaryTypes.LargeBinary}, + {Name: "f7", Type: arrow.BinaryTypes.LargeString}, + {Name: "f8", Type: arrow.BinaryTypes.Binary}, + {Name: "f9", Type: arrow.PrimitiveTypes.Date32}, + {Name: "f10", Type: arrow.PrimitiveTypes.Date64}, + {Name: "f11", Type: arrow.FixedWidthTypes.Float16}, + {Name: "f12", Type: arrow.PrimitiveTypes.Float32}, + {Name: "f13", Type: arrow.FixedWidthTypes.Time32s}, + {Name: "f14", Type: arrow.FixedWidthTypes.Time64us}, + {Name: "f15", Type: arrow.FixedWidthTypes.MonthInterval}, + {Name: "f16", Type: arrow.FixedWidthTypes.DayTimeInterval}, + {Name: "f17", Type: arrow.FixedWidthTypes.Duration_s}, + }, nil) + + var buf bytes.Buffer + writer := ipc.NewWriter(&buf, ipc.WithSchema(schema)) + defer writer.Close() + + rb := array.NewRecordBuilder(memory.DefaultAllocator, schema) + defer rb.Release() + rec := rb.NewRecord() // first record is empty + _ = writer.Write(rec) + + rb.Field(0).(*array.Int64Builder).Append(0) + rb.Field(1).(*array.Uint8Builder).Append(1) + rb.Field(2).(*array.Int8Builder).Append(2) + rb.Field(3).(*array.Uint16Builder).Append(3) + rb.Field(4).(*array.Int16Builder).Append(4) + rb.Field(5).(*array.Uint32Builder).Append(5) + rb.Field(6).(*array.BinaryBuilder).Append([]byte{6}) + rb.Field(7).(*array.LargeStringBuilder).Append("7") + rb.Field(8).(*array.BinaryBuilder).Append([]byte{8}) + rb.Field(9).(*array.Date32Builder).Append(arrow.Date32(int32(9))) + rb.Field(10).(*array.Date64Builder).Append(arrow.Date64(int64(10))) + rb.Field(11).(*array.Float16Builder).Append(float16.New(11)) + rb.Field(12).(*array.Float32Builder).Append(float32(12)) + rb.Field(13).(*array.Time32Builder).Append(arrow.Time32(int32(13))) + rb.Field(14).(*array.Time64Builder).Append(arrow.Time64(int64(14))) + rb.Field(15).(*array.MonthIntervalBuilder).Append(arrow.MonthInterval(int32(15))) + rb.Field(16).(*array.DayTimeIntervalBuilder).AppendNull() + rb.Field(17).(*array.DurationBuilder).Append(arrow.Duration(int64(17))) + + rec = rb.NewRecord() + _ = writer.Write(rec) + + rb.Field(0).(*array.Int64Builder).Append(0) + rb.Field(1).(*array.Uint8Builder).Append(1) + rb.Field(2).(*array.Int8Builder).Append(2) + rb.Field(3).(*array.Uint16Builder).Append(3) + rb.Field(4).(*array.Int16Builder).Append(4) + rb.Field(5).(*array.Uint32Builder).Append(5) + rb.Field(6).(*array.BinaryBuilder).Append([]byte{6}) + rb.Field(7).(*array.LargeStringBuilder).Append("7") + rb.Field(8).(*array.BinaryBuilder).Append([]byte{8}) + rb.Field(9).(*array.Date32Builder).Append(arrow.Date32(int32(9))) + rb.Field(10).(*array.Date64Builder).Append(arrow.Date64(int64(10))) + rb.Field(11).(*array.Float16Builder).Append(float16.New(11)) + rb.Field(12).(*array.Float32Builder).Append(float32(12)) + rb.Field(13).(*array.Time32Builder).Append(arrow.Time32(int32(13))) + rb.Field(14).(*array.Time64Builder).Append(arrow.Time64(int64(14))) + rb.Field(15).(*array.MonthIntervalBuilder).Append(arrow.MonthInterval(int32(15))) + rb.Field(16).(*array.DayTimeIntervalBuilder).AppendNull() + rb.Field(17).(*array.DurationBuilder).Append(arrow.Duration(int64(17))) + + rec = rb.NewRecord() + _ = writer.Write(rec) + + reader := ipc.NewMessageReader(&buf) + + ipcReader, err := ipc.NewReaderFromMessageReader( + &testMessagesReader{ + r: reader, + }) + assert.NoError(t, err) + + fReader := &flight.Reader{Reader: ipcReader} + it := newPointValueIterator(fReader) + + var resultSet0 []int64 + var resultSet1 []interface{} + var resultSet2 []interface{} + var resultSet3 []interface{} + var resultSet4 []interface{} + var resultSet5 []interface{} + var resultSet6 []interface{} + var resultSet7 []interface{} + var resultSet8 []interface{} + var resultSet9 []interface{} + var resultSet10 []interface{} + var resultSet11 []interface{} + var resultSet12 []interface{} + var resultSet13 []interface{} + var resultSet14 []interface{} + var resultSet15 []interface{} + var resultSet16 []interface{} + var resultSet17 []interface{} + + for { + pointValues, err := it.Next() + if errors.Is(err, Done) { + break + } + assert.NotNil(t, pointValues) + assert.NoError(t, err) + + resultSet0 = append(resultSet0, *pointValues.GetIntegerField("f0")) + resultSet1 = append(resultSet1, pointValues.GetField("f1")) + resultSet2 = append(resultSet2, pointValues.GetField("f2")) + resultSet3 = append(resultSet3, pointValues.GetField("f3")) + resultSet4 = append(resultSet4, pointValues.GetField("f4")) + resultSet5 = append(resultSet5, pointValues.GetField("f5")) + resultSet6 = append(resultSet6, pointValues.GetField("f6")) + resultSet7 = append(resultSet7, pointValues.GetField("f7")) + resultSet8 = append(resultSet8, pointValues.GetField("f8")) + resultSet9 = append(resultSet9, pointValues.GetField("f9")) + resultSet10 = append(resultSet10, pointValues.GetField("f10")) + resultSet11 = append(resultSet11, pointValues.GetField("f11")) + resultSet12 = append(resultSet12, pointValues.GetField("f12")) + resultSet13 = append(resultSet13, pointValues.GetField("f13")) + resultSet14 = append(resultSet14, pointValues.GetField("f14")) + resultSet15 = append(resultSet15, pointValues.GetField("f15")) + resultSet16 = append(resultSet16, pointValues.GetField("f16")) + resultSet17 = append(resultSet17, pointValues.GetField("f17")) + } + + assert.True(t, slices.Equal([]int64{0, 0}, resultSet0)) + + assert.Equal(t, uint8(1), resultSet1[0]) + assert.Equal(t, uint8(1), resultSet1[1]) + + assert.Equal(t, int8(2), resultSet2[0]) + assert.Equal(t, int8(2), resultSet2[1]) + + assert.Equal(t, uint16(3), resultSet3[0]) + assert.Equal(t, uint16(3), resultSet3[1]) + + assert.Equal(t, int16(4), resultSet4[0]) + assert.Equal(t, int16(4), resultSet4[1]) + + assert.Equal(t, uint32(5), resultSet5[0]) + assert.Equal(t, uint32(5), resultSet5[1]) + + assert.Equal(t, uint8(6), resultSet6[0].([]uint8)[0]) + assert.Equal(t, uint8(6), resultSet6[1].([]uint8)[0]) + + assert.Equal(t, "7", resultSet7[0]) + assert.Equal(t, "7", resultSet7[1]) + + assert.Equal(t, uint8(8), resultSet8[0].([]uint8)[0]) + assert.Equal(t, uint8(8), resultSet8[1].([]uint8)[0]) + + assert.Equal(t, arrow.Date32(int32(9)), resultSet9[0]) + assert.Equal(t, arrow.Date32(int32(9)), resultSet9[1]) + + assert.Equal(t, arrow.Date64(int64(10)), resultSet10[0]) + assert.Equal(t, arrow.Date64(int64(10)), resultSet10[1]) + + assert.Equal(t, uint16(18816), resultSet11[0].(float16.Num).Uint16()) + assert.Equal(t, uint16(18816), resultSet11[1].(float16.Num).Uint16()) + + assert.InEpsilon(t, float32(12), resultSet12[0], 0.01) + assert.InEpsilon(t, float32(12), resultSet12[1], 0.01) + + assert.Equal(t, arrow.Time32(int32(13)), resultSet13[0]) + assert.Equal(t, arrow.Time32(int32(13)), resultSet13[1]) + + assert.Equal(t, arrow.Time64(int64(14)), resultSet14[0]) + assert.Equal(t, arrow.Time64(int64(14)), resultSet14[1]) + + assert.Equal(t, arrow.MonthInterval(int32(15)), resultSet15[0]) + assert.Equal(t, arrow.MonthInterval(int32(15)), resultSet15[1]) + + assert.Nil(t, resultSet16[0]) + assert.Nil(t, resultSet16[1]) + + assert.Equal(t, resultSet17[0], arrow.Duration(int64(17))) + assert.Equal(t, resultSet17[1], arrow.Duration(int64(17))) + + pointValues, err := it.Next() + assert.Equal(t, 2, it.Index()) + assert.Equal(t, err, Done) + assert.Nil(t, pointValues) +} diff --git a/influxdb3/query.go b/influxdb3/query.go index 7a70210..4f16656 100644 --- a/influxdb3/query.go +++ b/influxdb3/query.go @@ -87,6 +87,19 @@ func (c *Client) Query(ctx context.Context, query string, options ...QueryOption return c.query(ctx, query, nil, newQueryOptions(&DefaultQueryOptions, options)) } +// QueryPointValue queries data from InfluxDB v3. +// Parameters: +// - ctx: The context.Context to use for the request. +// - query: The query string to execute. +// - options: The optional query options. See QueryOption for available options. +// +// Returns: +// - A result iterator (*PointValueIterator). +// - An error, if any. +func (c *Client) QueryPointValue(ctx context.Context, query string, options ...QueryOption) (*PointValueIterator, error) { + return c.queryPointValue(ctx, query, nil, newQueryOptions(&DefaultQueryOptions, options)) +} + // QueryWithParameters queries data from InfluxDB v3 with parameterized query. // Parameters: // - ctx: The context.Context to use for the request. @@ -102,6 +115,21 @@ func (c *Client) QueryWithParameters(ctx context.Context, query string, paramete return c.query(ctx, query, parameters, newQueryOptions(&DefaultQueryOptions, options)) } +// QueryPointValueWithParameters queries data from InfluxDB v3 with parameterized query. +// Parameters: +// - ctx: The context.Context to use for the request. +// - query: The query string to execute. +// - parameters: The query parameters. +// - options: The optional query options. See QueryOption for available options. +// +// Returns: +// - A result iterator (*PointValueIterator). +// - An error, if any. +func (c *Client) QueryPointValueWithParameters(ctx context.Context, query string, parameters QueryParameters, + options ...QueryOption) (*PointValueIterator, error) { + return c.queryPointValue(ctx, query, parameters, newQueryOptions(&DefaultQueryOptions, options)) +} + // QueryWithOptions Query data from InfluxDB v3 with query options. // Parameters: // - ctx: The context.Context to use for the request. @@ -122,6 +150,24 @@ func (c *Client) QueryWithOptions(ctx context.Context, options *QueryOptions, qu } func (c *Client) query(ctx context.Context, query string, parameters QueryParameters, options *QueryOptions) (*QueryIterator, error) { + reader, err := c.getReader(ctx, query, parameters, options) + if err != nil { + return nil, err + } + + return newQueryIterator(reader), nil +} + +func (c *Client) queryPointValue(ctx context.Context, query string, parameters QueryParameters, options *QueryOptions) (*PointValueIterator, error) { + reader, err := c.getReader(ctx, query, parameters, options) + if err != nil { + return nil, err + } + + return newPointValueIterator(reader), nil +} + +func (c *Client) getReader(ctx context.Context, query string, parameters QueryParameters, options *QueryOptions) (*flight.Reader, error) { var database string if options.Database != "" { database = options.Database @@ -175,6 +221,5 @@ func (c *Client) query(ctx context.Context, query string, parameters QueryParame return nil, fmt.Errorf("flight reader: %w", err) } - iterator := newQueryIterator(reader) - return iterator, nil + return reader, nil } diff --git a/influxdb3/query_iterator.go b/influxdb3/query_iterator.go index 537c1e7..cc7718e 100644 --- a/influxdb3/query_iterator.go +++ b/influxdb3/query_iterator.go @@ -111,13 +111,17 @@ func (i *QueryIterator) Next() bool { // AsPoints return data from InfluxDB v3 into PointValues structure. func (i *QueryIterator) AsPoints() *PointValues { - readerSchema := i.reader.Schema() + return rowToPointValue(i.record, i.indexInRecord) +} + +func rowToPointValue(record arrow.Record, rowIndex int) *PointValues { + readerSchema := record.Schema() p := NewPointValues("") - for ci, col := range i.record.Columns() { + for ci, col := range record.Columns() { field := readerSchema.Field(ci) name := field.Name - value, columnType, err := getArrowValue(col, field, i.indexInRecord) + value, columnType, err := getArrowValue(col, field, rowIndex) if err != nil { panic(err) } @@ -199,74 +203,48 @@ func getArrowValue(arrayNoType arrow.Array, field arrow.Field, i int) (any, resp if arrayNoType.IsNull(i) { return nil, columnType, nil } - var value any - switch arrayNoType.DataType().ID() { - case arrow.NULL: - value = nil - case arrow.BOOL: - value = arrayNoType.(*array.Boolean).Value(i) - case arrow.UINT8: - value = arrayNoType.(*array.Uint8).Value(i) - case arrow.INT8: - value = arrayNoType.(*array.Int8).Value(i) - case arrow.UINT16: - value = arrayNoType.(*array.Uint16).Value(i) - case arrow.INT16: - value = arrayNoType.(*array.Int16).Value(i) - case arrow.UINT32: - value = arrayNoType.(*array.Uint32).Value(i) - case arrow.INT32: - value = arrayNoType.(*array.Int32).Value(i) - case arrow.UINT64: - value = arrayNoType.(*array.Uint64).Value(i) - case arrow.INT64: - value = arrayNoType.(*array.Int64).Value(i) - case arrow.FLOAT16: - value = arrayNoType.(*array.Float16).Value(i) - case arrow.FLOAT32: - value = arrayNoType.(*array.Float32).Value(i) - case arrow.FLOAT64: - value = arrayNoType.(*array.Float64).Value(i) - case arrow.STRING: - value = arrayNoType.(*array.String).Value(i) - case arrow.BINARY: - value = arrayNoType.(*array.Binary).Value(i) - case arrow.FIXED_SIZE_BINARY: - value = arrayNoType.(*array.FixedSizeBinary).Value(i) - case arrow.DATE32: - value = arrayNoType.(*array.Date32).Value(i) - case arrow.DATE64: - value = arrayNoType.(*array.Date64).Value(i) - case arrow.TIMESTAMP: - value = arrayNoType.(*array.Timestamp).Value(i) - case arrow.TIME32: - value = arrayNoType.(*array.Time32).Value(i) - case arrow.TIME64: - value = arrayNoType.(*array.Time64).Value(i) - case arrow.INTERVAL_MONTHS: - value = arrayNoType.(*array.MonthInterval).Value(i) - case arrow.INTERVAL_DAY_TIME: - value = arrayNoType.(*array.DayTimeInterval).Value(i) - case arrow.DECIMAL128: - value = arrayNoType.(*array.Decimal128).Value(i) - case arrow.DECIMAL256: - value = arrayNoType.(*array.Decimal256).Value(i) - case arrow.DURATION: - value = arrayNoType.(*array.Duration).Value(i) - case arrow.LARGE_STRING: - value = arrayNoType.(*array.LargeString).Value(i) - case arrow.LARGE_BINARY: - value = arrayNoType.(*array.LargeBinary).Value(i) - case arrow.INTERVAL_MONTH_DAY_NANO: - value = arrayNoType.(*array.MonthDayNanoInterval).Value(i) - default: - return nil, columnType, fmt.Errorf("not supported data type: %s", arrayNoType.DataType().ID().String()) + typeExtractor := map[arrow.Type]func(arrow.Array, int) any{ + arrow.BOOL: func(arr arrow.Array, i int) any { return arr.(*array.Boolean).Value(i) }, + arrow.UINT8: func(arr arrow.Array, i int) any { return arr.(*array.Uint8).Value(i) }, + arrow.INT8: func(arr arrow.Array, i int) any { return arr.(*array.Int8).Value(i) }, + arrow.UINT16: func(arr arrow.Array, i int) any { return arr.(*array.Uint16).Value(i) }, + arrow.INT16: func(arr arrow.Array, i int) any { return arr.(*array.Int16).Value(i) }, + arrow.UINT32: func(arr arrow.Array, i int) any { return arr.(*array.Uint32).Value(i) }, + arrow.INT32: func(arr arrow.Array, i int) any { return arr.(*array.Int32).Value(i) }, + arrow.UINT64: func(arr arrow.Array, i int) any { return arr.(*array.Uint64).Value(i) }, + arrow.INT64: func(arr arrow.Array, i int) any { return arr.(*array.Int64).Value(i) }, + arrow.FLOAT16: func(arr arrow.Array, i int) any { return arr.(*array.Float16).Value(i) }, + arrow.FLOAT32: func(arr arrow.Array, i int) any { return arr.(*array.Float32).Value(i) }, + arrow.FLOAT64: func(arr arrow.Array, i int) any { return arr.(*array.Float64).Value(i) }, + arrow.STRING: func(arr arrow.Array, i int) any { return arr.(*array.String).Value(i) }, + arrow.BINARY: func(arr arrow.Array, i int) any { return arr.(*array.Binary).Value(i) }, + arrow.FIXED_SIZE_BINARY: func(arr arrow.Array, i int) any { return arr.(*array.FixedSizeBinary).Value(i) }, + arrow.DATE32: func(arr arrow.Array, i int) any { return arr.(*array.Date32).Value(i) }, + arrow.DATE64: func(arr arrow.Array, i int) any { return arr.(*array.Date64).Value(i) }, + arrow.TIMESTAMP: func(arr arrow.Array, i int) any { return arr.(*array.Timestamp).Value(i) }, + arrow.TIME32: func(arr arrow.Array, i int) any { return arr.(*array.Time32).Value(i) }, + arrow.TIME64: func(arr arrow.Array, i int) any { return arr.(*array.Time64).Value(i) }, + arrow.INTERVAL_MONTHS: func(arr arrow.Array, i int) any { return arr.(*array.MonthInterval).Value(i) }, + arrow.INTERVAL_DAY_TIME: func(arr arrow.Array, i int) any { return arr.(*array.DayTimeInterval).Value(i) }, + arrow.DECIMAL128: func(arr arrow.Array, i int) any { return arr.(*array.Decimal128).Value(i) }, + arrow.DECIMAL256: func(arr arrow.Array, i int) any { return arr.(*array.Decimal256).Value(i) }, + arrow.DURATION: func(arr arrow.Array, i int) any { return arr.(*array.Duration).Value(i) }, + arrow.LARGE_STRING: func(arr arrow.Array, i int) any { return arr.(*array.LargeString).Value(i) }, + arrow.LARGE_BINARY: func(arr arrow.Array, i int) any { return arr.(*array.LargeBinary).Value(i) }, + arrow.INTERVAL_MONTH_DAY_NANO: func(arr arrow.Array, i int) any { return arr.(*array.MonthDayNanoInterval).Value(i) }, } - if metadata, hasMetadata := field.Metadata.GetValue("iox::column::type"); hasMetadata { - value, columnType = getMetadataType(metadata, value, columnType) + dataType := arrayNoType.DataType().ID() + if extractor, exists := typeExtractor[dataType]; exists { + value := extractor(arrayNoType, i) + + if metadata, hasMetadata := field.Metadata.GetValue("iox::column::type"); hasMetadata { + value, columnType = getMetadataType(metadata, value, columnType) + } + return value, columnType, nil } - return value, columnType, nil + + return nil, columnType, fmt.Errorf("not supported data type: %s", dataType.String()) } func getMetadataType(metadata string, value any, columnType responseColumnType) (any, responseColumnType) {