diff --git a/CHANGELOG.md b/CHANGELOG.md index cc53037..d7a2f5b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,20 @@ -## 0.15.0 [unreleased] +## 1.0.0 [unreleased] + +### Breaking Changes + +:warning: **This is a breaking change release.** + +> Previously, the Query API did not respect the metadata type for columns returned from InfluxDB v3. This release fixes this issue. As a result, the type of some columns may differ from previous versions. For example, the timestamp column will now be `time.Time` instead of `arrow.Timestamp`. + +### Features + +1. [#114](https://github.com/InfluxCommunity/influxdb3-go/pull/114): Query API respects metadata types for columns returned from InfluxDB v3. + Tags are mapped as a "string", timestamp as "time.Time", and fields as their respective types: + - iox::column_type::field::integer: => int64 + - iox::column_type::field::uinteger: => uint64 + - iox::column_type::field::float: => float64 + - iox::column_type::field::string: => string + - iox::column_type::field::boolean: => bool ## 0.14.0 [2024-11-11] diff --git a/README.md b/README.md index 5947560..53fe095 100644 --- a/README.md +++ b/README.md @@ -79,7 +79,7 @@ import ( "fmt" "os" - "github.com/InfluxCommunity/influxdb3-go/influxdb3" + "github.com/InfluxCommunity/influxdb3-go/influxdb3/v1" ) ``` @@ -251,7 +251,9 @@ err = client.WriteData(context.Background(), data) ### Query -Use SQL or InfluxQL to query an InfluxDB v3 database or (Cloud Serverless) bucket and retrieve data in Arrow Columnar format. +Use SQL or InfluxQL to query an InfluxDB v3 database or Cloud Serverless bucket to retrieve data. +The client can return query results in the following formats: structured `PointValues` object, key-value pairs, or Arrow Columnar Format. + By default, the client sends the query as SQL. `influxdb3` provides an iterator for processing data rows--for example: @@ -275,6 +277,8 @@ if err != nil { // Process the result. for iterator.Next() { + // The query iterator returns each row as a map[string]interface{}. + // The keys are the column names, allowing you to access the values by column name. value := iterator.Value() fmt.Printf("temperature in Paris is %f\n", value["temperature"]) diff --git a/examples/Basic/basic.go b/examples/Basic/basic.go index 394957d..598056f 100644 --- a/examples/Basic/basic.go +++ b/examples/Basic/basic.go @@ -6,8 +6,7 @@ import ( "os" "time" - "github.com/InfluxCommunity/influxdb3-go/influxdb3" - "github.com/apache/arrow/go/v15/arrow" + "github.com/InfluxCommunity/influxdb3-go/v1/influxdb3" ) func main() { @@ -100,9 +99,11 @@ func main() { panic(err) } for iterator.Next() { + // The query iterator returns each row as a map[string]interface{}. + // The keys are the column names, allowing you to access the values by column name. value := iterator.Value() fmt.Printf("%s at %v:\n", value["location"], - (value["time"].(arrow.Timestamp)).ToTime(arrow.Nanosecond).Format(time.RFC822)) + (value["time"].(time.Time)).Format(time.RFC822)) fmt.Printf(" temperature: %f\n", value["temperature"]) fmt.Printf(" humidity : %d%%\n", value["humidity"]) } diff --git a/examples/Batching/batching.go b/examples/Batching/batching.go index d41052d..a0e2b4e 100644 --- a/examples/Batching/batching.go +++ b/examples/Batching/batching.go @@ -8,9 +8,8 @@ import ( "text/tabwriter" "time" - "github.com/InfluxCommunity/influxdb3-go/influxdb3" - "github.com/InfluxCommunity/influxdb3-go/influxdb3/batching" - "github.com/apache/arrow/go/v15/arrow" + "github.com/InfluxCommunity/influxdb3-go/v1/influxdb3" + "github.com/InfluxCommunity/influxdb3-go/v1/influxdb3/batching" ) const NumPoints = 54 @@ -142,7 +141,7 @@ func main() { // Process the data for iterator.Next() { value := iterator.Value() - t := (value["time"].(arrow.Timestamp)).ToTime(arrow.Nanosecond).Format(time.RFC3339) + t := (value["time"].(time.Time)).Format(time.RFC3339) fmt.Fprintf(w, "%v\t%s\t%.1f\t%d\n", t, value["location"], value["temperature"], value["humidity"]) } } diff --git a/examples/Downsampling/downsampling.go b/examples/Downsampling/downsampling.go index 1ed05ff..fb39f2b 100644 --- a/examples/Downsampling/downsampling.go +++ b/examples/Downsampling/downsampling.go @@ -3,11 +3,11 @@ package main import ( "context" "fmt" + "github.com/apache/arrow/go/v15/arrow" "os" "time" - "github.com/InfluxCommunity/influxdb3-go/influxdb3" - "github.com/apache/arrow/go/v15/arrow" + "github.com/InfluxCommunity/influxdb3-go/v1/influxdb3" ) func main() { @@ -96,10 +96,10 @@ func main() { for iterator.Next() { row := iterator.AsPoints() timestamp := (row.GetField("window_start").(arrow.Timestamp)).ToTime(arrow.Nanosecond) - location := row.GetStringField("location") + location, _ := row.GetTag("location") avgValue := row.GetDoubleField("avg") maxValue := row.GetDoubleField("max") - fmt.Printf("%s %s temperature: avg %.2f, max %.2f\n", timestamp.Format(time.RFC822), *location, *avgValue, *maxValue) + fmt.Printf("%s %s temperature: avg %.2f, max %.2f\n", timestamp.Format(time.RFC822), location, *avgValue, *maxValue) // // Write back downsampled data. @@ -114,7 +114,7 @@ func main() { downsampledPoint = downsampledPoint. RemoveField("window_start"). SetTimestampWithEpoch(timestamp.UnixNano()) - + // Write the downsampled Point to the database. err = client.WritePoints(context.Background(), []*influxdb3.Point{downsampledPoint}) if err != nil { diff --git a/examples/HTTPErrorHandled/httpErrorHandled.go b/examples/HTTPErrorHandled/httpErrorHandled.go index 7809c27..e2c4b84 100644 --- a/examples/HTTPErrorHandled/httpErrorHandled.go +++ b/examples/HTTPErrorHandled/httpErrorHandled.go @@ -7,7 +7,7 @@ import ( "log" "os" - "github.com/InfluxCommunity/influxdb3-go/influxdb3" + "github.com/InfluxCommunity/influxdb3-go/v1/influxdb3" ) // Demonstrates working with HTTP response headers in ServerError diff --git a/examples/LPBatching/lpBatching.go b/examples/LPBatching/lpBatching.go index cd4a44d..88658c8 100644 --- a/examples/LPBatching/lpBatching.go +++ b/examples/LPBatching/lpBatching.go @@ -9,9 +9,8 @@ import ( "text/tabwriter" "time" - "github.com/InfluxCommunity/influxdb3-go/influxdb3" - "github.com/InfluxCommunity/influxdb3-go/influxdb3/batching" - "github.com/apache/arrow/go/v15/arrow" + "github.com/InfluxCommunity/influxdb3-go/v1/influxdb3" + "github.com/InfluxCommunity/influxdb3-go/v1/influxdb3/batching" ) const LineCount = 100 @@ -133,7 +132,7 @@ func main() { fmt.Fprintln(tw, "\nTime\tid\tlocation\tspeed\tbearing\tticks") for iterator.Next() { value := iterator.Value() - t := (value["time"].(arrow.Timestamp)).ToTime(arrow.Nanosecond).Format(time.RFC3339) + t := (value["time"].(time.Time)).Format(time.RFC3339) _, err := fmt.Fprintf(tw, "%v\t%s\t%s\t%.1f\t%.2f\t%d\n", t, value["id"], value["location"], value["speed"], value["bearing"], value["ticks"]) if err != nil { diff --git a/examples/README.md b/examples/README.md index 4f1815f..3b0758c 100644 --- a/examples/README.md +++ b/examples/README.md @@ -1,6 +1,6 @@ # Examples -- [Write and query data](Basic/basic.go) - A complete Go example that demonstrates the different ways of writing data, and then queries your data stored in InfluxDB v3 (formerly InfluxDB IOx). +- [Write and query data](Basic/basic.go) - A complete Go example that demonstrates the different ways of writing data, and then queries your data stored in InfluxDB v3. - [Downsampling](Downsampling/downsampling.go) - A complete Go example that uses a downsampling query and then writes downsampled data back to a different table. - [HTTP Error Handling](HTTPErrorHandled/httpErrorHandled.go) - A complete Go example for reading HTTP headers in case of an server error occurs. - [Batching write](Batching/batching.go) - A complete Go example that demonstrates how to write Point data in batches. diff --git a/go.mod b/go.mod index 90aebcd..71804e2 100644 --- a/go.mod +++ b/go.mod @@ -1,4 +1,4 @@ -module github.com/InfluxCommunity/influxdb3-go +module github.com/InfluxCommunity/influxdb3-go/v1 go 1.22.7 diff --git a/influxdb3/batching/batcher.go b/influxdb3/batching/batcher.go index c9ff2ce..6bd0ba8 100644 --- a/influxdb3/batching/batcher.go +++ b/influxdb3/batching/batcher.go @@ -28,7 +28,7 @@ import ( "log/slog" "sync" - "github.com/InfluxCommunity/influxdb3-go/influxdb3" + "github.com/InfluxCommunity/influxdb3-go/v1/influxdb3" ) // DefaultBatchSize is the default number of points emitted diff --git a/influxdb3/batching/batcher_test.go b/influxdb3/batching/batcher_test.go index c8c403c..fe6b3a7 100644 --- a/influxdb3/batching/batcher_test.go +++ b/influxdb3/batching/batcher_test.go @@ -27,7 +27,7 @@ import ( "testing" "time" - "github.com/InfluxCommunity/influxdb3-go/influxdb3" + "github.com/InfluxCommunity/influxdb3-go/v1/influxdb3" "github.com/stretchr/testify/assert" ) diff --git a/influxdb3/batching/example_test.go b/influxdb3/batching/example_test.go index 421923c..c7dbd18 100644 --- a/influxdb3/batching/example_test.go +++ b/influxdb3/batching/example_test.go @@ -29,8 +29,8 @@ import ( "math/rand" "time" - "github.com/InfluxCommunity/influxdb3-go/influxdb3" - "github.com/InfluxCommunity/influxdb3-go/influxdb3/batching" + "github.com/InfluxCommunity/influxdb3-go/v1/influxdb3" + "github.com/InfluxCommunity/influxdb3-go/v1/influxdb3/batching" ) func Example_batcher() { diff --git a/influxdb3/client_e2e_test.go b/influxdb3/client_e2e_test.go index 7e16582..17d64cb 100644 --- a/influxdb3/client_e2e_test.go +++ b/influxdb3/client_e2e_test.go @@ -37,9 +37,8 @@ import ( "testing" "time" - "github.com/InfluxCommunity/influxdb3-go/influxdb3" - "github.com/InfluxCommunity/influxdb3-go/influxdb3/batching" - "github.com/apache/arrow/go/v15/arrow" + "github.com/InfluxCommunity/influxdb3-go/v1/influxdb3" + "github.com/InfluxCommunity/influxdb3-go/v1/influxdb3/batching" "github.com/influxdata/line-protocol/v2/lineprotocol" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -138,7 +137,7 @@ func TestWriteAndQueryExample(t *testing.T) { assert.Equal(t, uint64(800), value["uindex"]) assert.Equal(t, true, value["valid"]) assert.Equal(t, "a1", value["text"]) - assert.Equal(t, now, value["time"].(arrow.Timestamp).ToTime(arrow.Nanosecond)) + assert.Equal(t, now, value["time"]) // row #2 @@ -151,7 +150,7 @@ func TestWriteAndQueryExample(t *testing.T) { assert.Equal(t, uint64(150), value["uindex"]) assert.Equal(t, false, value["valid"]) assert.Equal(t, "b1", value["text"]) - assert.Equal(t, now.Add(1*time.Second), value["time"].(arrow.Timestamp).ToTime(arrow.Nanosecond)) + assert.Equal(t, now.Add(1*time.Second), value["time"]) assert.False(t, iterator.Done()) @@ -233,7 +232,7 @@ func TestQueryWithParameters(t *testing.T) { assert.Equal(t, uint64(800), value["uindex"]) assert.Equal(t, true, value["valid"]) assert.Equal(t, "a1", value["text"]) - assert.Equal(t, now, value["time"].(arrow.Timestamp).ToTime(arrow.Nanosecond)) + assert.Equal(t, now, value["time"]) assert.False(t, iterator.Done()) assert.False(t, iterator.Next()) diff --git a/influxdb3/query.go b/influxdb3/query.go index 854cec3..7a70210 100644 --- a/influxdb3/query.go +++ b/influxdb3/query.go @@ -74,20 +74,20 @@ func (c *Client) setQueryClient(flightClient flight.Client) { // QueryParameters is a type for query parameters. type QueryParameters = map[string]any -// Query queries data from InfluxDB IOx. +// Query queries data from InfluxDB v3. // Parameters: // - ctx: The context.Context to use for the request. // - query: The query string to execute. // - options: The optional query options. See QueryOption for available options. // // Returns: -// - A custom iterator (*QueryIterator). +// - A result iterator (*QueryIterator). // - An error, if any. func (c *Client) Query(ctx context.Context, query string, options ...QueryOption) (*QueryIterator, error) { return c.query(ctx, query, nil, newQueryOptions(&DefaultQueryOptions, options)) } -// QueryWithParameters queries data from InfluxDB IOx with parameterized query. +// QueryWithParameters queries data from InfluxDB v3 with parameterized query. // Parameters: // - ctx: The context.Context to use for the request. // - query: The query string to execute. @@ -95,21 +95,21 @@ func (c *Client) Query(ctx context.Context, query string, options ...QueryOption // - options: The optional query options. See QueryOption for available options. // // Returns: -// - A custom iterator (*QueryIterator). +// - A result iterator (*QueryIterator). // - An error, if any. func (c *Client) QueryWithParameters(ctx context.Context, query string, parameters QueryParameters, options ...QueryOption) (*QueryIterator, error) { return c.query(ctx, query, parameters, newQueryOptions(&DefaultQueryOptions, options)) } -// QueryWithOptions Query data from InfluxDB IOx with query options. +// QueryWithOptions Query data from InfluxDB v3 with query options. // Parameters: // - ctx: The context.Context to use for the request. // - options: Query options (query type, optional database). // - query: The query string to execute. // // Returns: -// - A custom iterator (*QueryIterator) that can also be used to get raw flightsql reader. +// - A result iterator (*QueryIterator). // - An error, if any. // // Deprecated: use Query with variadic QueryOption options. diff --git a/influxdb3/query_iterator.go b/influxdb3/query_iterator.go index 652a482..537c1e7 100644 --- a/influxdb3/query_iterator.go +++ b/influxdb3/query_iterator.go @@ -24,16 +24,30 @@ package influxdb3 import ( "fmt" - "strings" + "time" "github.com/apache/arrow/go/v15/arrow" "github.com/apache/arrow/go/v15/arrow/array" "github.com/apache/arrow/go/v15/arrow/flight" ) +type responseColumnType byte + +const ( + responseColumnTypeUnknown responseColumnType = iota + responseColumnTypeTimestamp + responseColumnTypeField + responseColumnTypeTag +) + // QueryIterator is a custom query iterator that encapsulates and simplifies the logic for // the flight reader. It provides methods such as Next, Value, and Index to consume the flight reader, // or users can use the underlying reader directly with the Raw method. +// +// The QueryIterator can return responses as one of the following data types: +// - iterator.Value() returns map[string]interface{} object representing the current row +// - iterator.AsPoints() returns *PointValues object representing the current row +// - iterator.Raw() returns the underlying *flight.Reader object type QueryIterator struct { reader *flight.Reader // Current record @@ -77,13 +91,13 @@ func (i *QueryIterator) Next() bool { i.indexInRecord = 0 } - schema := i.reader.Schema() + readerSchema := i.reader.Schema() obj := make(map[string]interface{}, len(i.record.Columns())) for ci, col := range i.record.Columns() { - name := schema.Field(ci).Name - value, err := getArrowValue(col, i.indexInRecord) - + field := readerSchema.Field(ci) + name := field.Name + value, _, err := getArrowValue(col, field, i.indexInRecord) if err != nil { panic(err) } @@ -95,15 +109,15 @@ func (i *QueryIterator) Next() bool { return true } -// AsPoints return data from InfluxDB IOx into PointValues structure. +// AsPoints return data from InfluxDB v3 into PointValues structure. func (i *QueryIterator) AsPoints() *PointValues { readerSchema := i.reader.Schema() p := NewPointValues("") for ci, col := range i.record.Columns() { - schema := readerSchema.Field(ci) - name := schema.Name - value, err := getArrowValue(col, i.indexInRecord) + field := readerSchema.Field(ci) + name := field.Name + value, columnType, err := getArrowValue(col, field, i.indexInRecord) if err != nil { panic(err) } @@ -111,31 +125,24 @@ func (i *QueryIterator) AsPoints() *PointValues { continue } - metadataType, hasMetadataType := schema.Metadata.GetValue("iox::column::type") - if stringValue, isString := value.(string); ((name == "measurement") || (name == "iox::measurement")) && isString { p.SetMeasurement(stringValue) continue } - if !hasMetadataType { + switch { + case columnType == responseColumnTypeUnknown: if timestampValue, isTimestamp := value.(arrow.Timestamp); isTimestamp && name == "time" { p.SetTimestamp(timestampValue.ToTime(arrow.Nanosecond)) } else { p.SetField(name, value) } - continue - } - - parts := strings.Split(metadataType, "::") - _, _, valueType := parts[0], parts[1], parts[2] - - if valueType == "field" { + case columnType == responseColumnTypeField: p.SetField(name, value) - } else if stringValue, isString := value.(string); isString && valueType == "tag" { - p.SetTag(name, stringValue) - } else if timestampValue, isTimestamp := value.(arrow.Timestamp); isTimestamp && valueType == "timestamp" { - p.SetTimestamp(timestampValue.ToTime(arrow.Nanosecond)) + case columnType == responseColumnTypeTag: + p.SetTag(name, value.(string)) + case columnType == responseColumnTypeTimestamp: + p.SetTimestamp(value.(time.Time)) } } @@ -145,6 +152,16 @@ func (i *QueryIterator) AsPoints() *PointValues { // Value returns the current value from the flight reader as a map object. // The map contains the fields and tags as key-value pairs. // +// The current value types respect metadata provided by InfluxDB v3 metadata query response. +// Tags are mapped as a "string", timestamp as "time.Time", and fields as their respective types. +// +// Field are mapped to the following types: +// - iox::column_type::field::integer: => int64 +// - iox::column_type::field::uinteger: => uint64 +// - iox::column_type::field::float: => float64 +// - iox::column_type::field::string: => string +// - iox::column_type::field::boolean: => bool +// // Returns: // - A map[string]interface{} object representing the current value. func (i *QueryIterator) Value() map[string]interface{} { @@ -177,90 +194,118 @@ func (i *QueryIterator) Raw() *flight.Reader { return i.reader } -func getArrowValue(arrayNoType arrow.Array, i int) (interface{}, error) { +func getArrowValue(arrayNoType arrow.Array, field arrow.Field, i int) (any, responseColumnType, error) { + var columnType = responseColumnTypeUnknown if arrayNoType.IsNull(i) { - return nil, nil + return nil, columnType, nil } + var value any switch arrayNoType.DataType().ID() { case arrow.NULL: - return nil, nil + value = nil case arrow.BOOL: - return arrayNoType.(*array.Boolean).Value(i), nil + value = arrayNoType.(*array.Boolean).Value(i) case arrow.UINT8: - return arrayNoType.(*array.Uint8).Value(i), nil + value = arrayNoType.(*array.Uint8).Value(i) case arrow.INT8: - return arrayNoType.(*array.Int8).Value(i), nil + value = arrayNoType.(*array.Int8).Value(i) case arrow.UINT16: - return arrayNoType.(*array.Uint16).Value(i), nil + value = arrayNoType.(*array.Uint16).Value(i) case arrow.INT16: - return arrayNoType.(*array.Int16).Value(i), nil + value = arrayNoType.(*array.Int16).Value(i) case arrow.UINT32: - return arrayNoType.(*array.Uint32).Value(i), nil + value = arrayNoType.(*array.Uint32).Value(i) case arrow.INT32: - return arrayNoType.(*array.Int32).Value(i), nil + value = arrayNoType.(*array.Int32).Value(i) case arrow.UINT64: - return arrayNoType.(*array.Uint64).Value(i), nil + value = arrayNoType.(*array.Uint64).Value(i) case arrow.INT64: - return arrayNoType.(*array.Int64).Value(i), nil + value = arrayNoType.(*array.Int64).Value(i) case arrow.FLOAT16: - return arrayNoType.(*array.Float16).Value(i), nil + value = arrayNoType.(*array.Float16).Value(i) case arrow.FLOAT32: - return arrayNoType.(*array.Float32).Value(i), nil + value = arrayNoType.(*array.Float32).Value(i) case arrow.FLOAT64: - return arrayNoType.(*array.Float64).Value(i), nil + value = arrayNoType.(*array.Float64).Value(i) case arrow.STRING: - return arrayNoType.(*array.String).Value(i), nil + value = arrayNoType.(*array.String).Value(i) case arrow.BINARY: - return arrayNoType.(*array.Binary).Value(i), nil + value = arrayNoType.(*array.Binary).Value(i) case arrow.FIXED_SIZE_BINARY: - return arrayNoType.(*array.FixedSizeBinary).Value(i), nil + value = arrayNoType.(*array.FixedSizeBinary).Value(i) case arrow.DATE32: - return arrayNoType.(*array.Date32).Value(i), nil + value = arrayNoType.(*array.Date32).Value(i) case arrow.DATE64: - return arrayNoType.(*array.Date64).Value(i), nil + value = arrayNoType.(*array.Date64).Value(i) case arrow.TIMESTAMP: - return arrayNoType.(*array.Timestamp).Value(i), nil + value = arrayNoType.(*array.Timestamp).Value(i) case arrow.TIME32: - return arrayNoType.(*array.Time32).Value(i), nil + value = arrayNoType.(*array.Time32).Value(i) case arrow.TIME64: - return arrayNoType.(*array.Time64).Value(i), nil + value = arrayNoType.(*array.Time64).Value(i) case arrow.INTERVAL_MONTHS: - return arrayNoType.(*array.MonthInterval).Value(i), nil + value = arrayNoType.(*array.MonthInterval).Value(i) case arrow.INTERVAL_DAY_TIME: - return arrayNoType.(*array.DayTimeInterval).Value(i), nil + value = arrayNoType.(*array.DayTimeInterval).Value(i) case arrow.DECIMAL128: - return arrayNoType.(*array.Decimal128).Value(i), nil + value = arrayNoType.(*array.Decimal128).Value(i) case arrow.DECIMAL256: - return arrayNoType.(*array.Decimal256).Value(i), nil - // case arrow.LIST: - // return arrayNoType.(*array.List).Value(i), nil - // case arrow.STRUCT: - // return arrayNoType.(*array.Struct).Value(i), nil - // case arrow.SPARSE_UNION: - // return arrayNoType.(*array.SparseUnion).Value(i), nil - // case arrow.DENSE_UNION: - // return arrayNoType.(*array.DenseUnion).Value(i), nil - // case arrow.DICTIONARY: - // return arrayNoType.(*array.Dictionary).Value(i), nil - // case arrow.MAP: - // return arrayNoType.(*array.Map).Value(i), nil - // case arrow.EXTENSION: - // return arrayNoType.(*array.ExtensionArrayBase).Value(i), nil - // case arrow.FIXED_SIZE_LIST: - // return arrayNoType.(*array.FixedSizeList).Value(i), nil + value = arrayNoType.(*array.Decimal256).Value(i) case arrow.DURATION: - return arrayNoType.(*array.Duration).Value(i), nil + value = arrayNoType.(*array.Duration).Value(i) case arrow.LARGE_STRING: - return arrayNoType.(*array.LargeString).Value(i), nil + value = arrayNoType.(*array.LargeString).Value(i) case arrow.LARGE_BINARY: - return arrayNoType.(*array.LargeBinary).Value(i), nil - // case arrow.LARGE_LIST: - // return arrayNoType.(*array.LargeList).Value(i), nil + value = arrayNoType.(*array.LargeBinary).Value(i) case arrow.INTERVAL_MONTH_DAY_NANO: - return arrayNoType.(*array.MonthDayNanoInterval).Value(i), nil - // case arrow.RUN_END_ENCODED: - // return arrayNoType.(*array.RunEndEncoded).Value(i), nil + value = arrayNoType.(*array.MonthDayNanoInterval).Value(i) default: - return nil, fmt.Errorf("not supported data type: %s", arrayNoType.DataType().ID().String()) + return nil, columnType, fmt.Errorf("not supported data type: %s", arrayNoType.DataType().ID().String()) + } + + if metadata, hasMetadata := field.Metadata.GetValue("iox::column::type"); hasMetadata { + value, columnType = getMetadataType(metadata, value, columnType) + } + return value, columnType, nil +} + +func getMetadataType(metadata string, value any, columnType responseColumnType) (any, responseColumnType) { + switch metadata { + case "iox::column_type::field::integer": + if intValue, ok := value.(int64); ok { + value = intValue + columnType = responseColumnTypeField + } + case "iox::column_type::field::uinteger": + if uintValue, ok := value.(uint64); ok { + value = uintValue + columnType = responseColumnTypeField + } + case "iox::column_type::field::float": + if floatValue, ok := value.(float64); ok { + value = floatValue + columnType = responseColumnTypeField + } + case "iox::column_type::field::string": + if stringValue, ok := value.(string); ok { + value = stringValue + columnType = responseColumnTypeField + } + case "iox::column_type::field::boolean": + if boolValue, ok := value.(bool); ok { + value = boolValue + columnType = responseColumnTypeField + } + case "iox::column_type::tag": + if stringValue, ok := value.(string); ok { + value = stringValue + columnType = responseColumnTypeTag + } + case "iox::column_type::timestamp": + if timestampValue, ok := value.(arrow.Timestamp); ok { + value = timestampValue.ToTime(arrow.Nanosecond) + columnType = responseColumnTypeTimestamp + } } + return value, columnType } diff --git a/influxdb3/version.go b/influxdb3/version.go index 04db937..2a74645 100644 --- a/influxdb3/version.go +++ b/influxdb3/version.go @@ -27,7 +27,7 @@ import ( ) // version defines current version -const version = "0.15.0" +const version = "1.0.0" // userAgent header value const userAgent = "influxdb3-go/" + version + " (" + runtime.GOOS + "; " + runtime.GOARCH + ")" diff --git a/influxdb3/write.go b/influxdb3/write.go index 2a9467e..de796e0 100644 --- a/influxdb3/write.go +++ b/influxdb3/write.go @@ -33,7 +33,7 @@ import ( "strings" "time" - "github.com/InfluxCommunity/influxdb3-go/influxdb3/gzip" + "github.com/InfluxCommunity/influxdb3-go/v1/influxdb3/gzip" "github.com/influxdata/line-protocol/v2/lineprotocol" )