Skip to content

Commit

Permalink
feat: respect iox::column_type::field metadata when mapping query r…
Browse files Browse the repository at this point in the history
…esults into values (#114)

* feat: respect `iox::column_type::field` metadata when mapping query results into values

* chore: prepare major version 1
  • Loading branch information
bednar authored Nov 15, 2024
1 parent 9293f5f commit ed5570e
Show file tree
Hide file tree
Showing 17 changed files with 178 additions and 115 deletions.
18 changes: 17 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,20 @@
## 0.15.0 [unreleased]
## 1.0.0 [unreleased]

### Breaking Changes

:warning: **This is a breaking change release.**

> Previously, the Query API did not respect the metadata type for columns returned from InfluxDB v3. This release fixes this issue. As a result, the type of some columns may differ from previous versions. For example, the timestamp column will now be `time.Time` instead of `arrow.Timestamp`.
### Features

1. [#114](https://github.com/InfluxCommunity/influxdb3-go/pull/114): Query API respects metadata types for columns returned from InfluxDB v3.
Tags are mapped as a "string", timestamp as "time.Time", and fields as their respective types:
- iox::column_type::field::integer: => int64
- iox::column_type::field::uinteger: => uint64
- iox::column_type::field::float: => float64
- iox::column_type::field::string: => string
- iox::column_type::field::boolean: => bool

## 0.14.0 [2024-11-11]

Expand Down
8 changes: 6 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ import (
"fmt"
"os"

"github.com/InfluxCommunity/influxdb3-go/influxdb3"
"github.com/InfluxCommunity/influxdb3-go/influxdb3/v1"
)
```

Expand Down Expand Up @@ -251,7 +251,9 @@ err = client.WriteData(context.Background(), data)

### Query

Use SQL or InfluxQL to query an InfluxDB v3 database or (Cloud Serverless) bucket and retrieve data in Arrow Columnar format.
Use SQL or InfluxQL to query an InfluxDB v3 database or Cloud Serverless bucket to retrieve data.
The client can return query results in the following formats: structured `PointValues` object, key-value pairs, or Arrow Columnar Format.

By default, the client sends the query as SQL.

`influxdb3` provides an iterator for processing data rows--for example:
Expand All @@ -275,6 +277,8 @@ if err != nil {

// Process the result.
for iterator.Next() {
// The query iterator returns each row as a map[string]interface{}.
// The keys are the column names, allowing you to access the values by column name.
value := iterator.Value()

fmt.Printf("temperature in Paris is %f\n", value["temperature"])
Expand Down
7 changes: 4 additions & 3 deletions examples/Basic/basic.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ import (
"os"
"time"

"github.com/InfluxCommunity/influxdb3-go/influxdb3"
"github.com/apache/arrow/go/v15/arrow"
"github.com/InfluxCommunity/influxdb3-go/v1/influxdb3"
)

func main() {
Expand Down Expand Up @@ -100,9 +99,11 @@ func main() {
panic(err)
}
for iterator.Next() {
// The query iterator returns each row as a map[string]interface{}.
// The keys are the column names, allowing you to access the values by column name.
value := iterator.Value()
fmt.Printf("%s at %v:\n", value["location"],
(value["time"].(arrow.Timestamp)).ToTime(arrow.Nanosecond).Format(time.RFC822))
(value["time"].(time.Time)).Format(time.RFC822))
fmt.Printf(" temperature: %f\n", value["temperature"])
fmt.Printf(" humidity : %d%%\n", value["humidity"])
}
Expand Down
7 changes: 3 additions & 4 deletions examples/Batching/batching.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@ import (
"text/tabwriter"
"time"

"github.com/InfluxCommunity/influxdb3-go/influxdb3"
"github.com/InfluxCommunity/influxdb3-go/influxdb3/batching"
"github.com/apache/arrow/go/v15/arrow"
"github.com/InfluxCommunity/influxdb3-go/v1/influxdb3"
"github.com/InfluxCommunity/influxdb3-go/v1/influxdb3/batching"
)

const NumPoints = 54
Expand Down Expand Up @@ -142,7 +141,7 @@ func main() {
// Process the data
for iterator.Next() {
value := iterator.Value()
t := (value["time"].(arrow.Timestamp)).ToTime(arrow.Nanosecond).Format(time.RFC3339)
t := (value["time"].(time.Time)).Format(time.RFC3339)
fmt.Fprintf(w, "%v\t%s\t%.1f\t%d\n", t, value["location"], value["temperature"], value["humidity"])
}
}
10 changes: 5 additions & 5 deletions examples/Downsampling/downsampling.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ package main
import (
"context"
"fmt"
"github.com/apache/arrow/go/v15/arrow"
"os"
"time"

"github.com/InfluxCommunity/influxdb3-go/influxdb3"
"github.com/apache/arrow/go/v15/arrow"
"github.com/InfluxCommunity/influxdb3-go/v1/influxdb3"
)

func main() {
Expand Down Expand Up @@ -96,10 +96,10 @@ func main() {
for iterator.Next() {
row := iterator.AsPoints()
timestamp := (row.GetField("window_start").(arrow.Timestamp)).ToTime(arrow.Nanosecond)
location := row.GetStringField("location")
location, _ := row.GetTag("location")
avgValue := row.GetDoubleField("avg")
maxValue := row.GetDoubleField("max")
fmt.Printf("%s %s temperature: avg %.2f, max %.2f\n", timestamp.Format(time.RFC822), *location, *avgValue, *maxValue)
fmt.Printf("%s %s temperature: avg %.2f, max %.2f\n", timestamp.Format(time.RFC822), location, *avgValue, *maxValue)

//
// Write back downsampled data.
Expand All @@ -114,7 +114,7 @@ func main() {
downsampledPoint = downsampledPoint.
RemoveField("window_start").
SetTimestampWithEpoch(timestamp.UnixNano())

// Write the downsampled Point to the database.
err = client.WritePoints(context.Background(), []*influxdb3.Point{downsampledPoint})
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion examples/HTTPErrorHandled/httpErrorHandled.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"log"
"os"

"github.com/InfluxCommunity/influxdb3-go/influxdb3"
"github.com/InfluxCommunity/influxdb3-go/v1/influxdb3"
)

// Demonstrates working with HTTP response headers in ServerError
Expand Down
7 changes: 3 additions & 4 deletions examples/LPBatching/lpBatching.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,8 @@ import (
"text/tabwriter"
"time"

"github.com/InfluxCommunity/influxdb3-go/influxdb3"
"github.com/InfluxCommunity/influxdb3-go/influxdb3/batching"
"github.com/apache/arrow/go/v15/arrow"
"github.com/InfluxCommunity/influxdb3-go/v1/influxdb3"
"github.com/InfluxCommunity/influxdb3-go/v1/influxdb3/batching"
)

const LineCount = 100
Expand Down Expand Up @@ -133,7 +132,7 @@ func main() {
fmt.Fprintln(tw, "\nTime\tid\tlocation\tspeed\tbearing\tticks")
for iterator.Next() {
value := iterator.Value()
t := (value["time"].(arrow.Timestamp)).ToTime(arrow.Nanosecond).Format(time.RFC3339)
t := (value["time"].(time.Time)).Format(time.RFC3339)
_, err := fmt.Fprintf(tw, "%v\t%s\t%s\t%.1f\t%.2f\t%d\n", t,
value["id"], value["location"], value["speed"], value["bearing"], value["ticks"])
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion examples/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Examples

- [Write and query data](Basic/basic.go) - A complete Go example that demonstrates the different ways of writing data, and then queries your data stored in InfluxDB v3 (formerly InfluxDB IOx).
- [Write and query data](Basic/basic.go) - A complete Go example that demonstrates the different ways of writing data, and then queries your data stored in InfluxDB v3.
- [Downsampling](Downsampling/downsampling.go) - A complete Go example that uses a downsampling query and then writes downsampled data back to a different table.
- [HTTP Error Handling](HTTPErrorHandled/httpErrorHandled.go) - A complete Go example for reading HTTP headers in case of an server error occurs.
- [Batching write](Batching/batching.go) - A complete Go example that demonstrates how to write Point data in batches.
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module github.com/InfluxCommunity/influxdb3-go
module github.com/InfluxCommunity/influxdb3-go/v1

go 1.22.7

Expand Down
2 changes: 1 addition & 1 deletion influxdb3/batching/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
"log/slog"
"sync"

"github.com/InfluxCommunity/influxdb3-go/influxdb3"
"github.com/InfluxCommunity/influxdb3-go/v1/influxdb3"
)

// DefaultBatchSize is the default number of points emitted
Expand Down
2 changes: 1 addition & 1 deletion influxdb3/batching/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"testing"
"time"

"github.com/InfluxCommunity/influxdb3-go/influxdb3"
"github.com/InfluxCommunity/influxdb3-go/v1/influxdb3"
"github.com/stretchr/testify/assert"
)

Expand Down
4 changes: 2 additions & 2 deletions influxdb3/batching/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ import (
"math/rand"
"time"

"github.com/InfluxCommunity/influxdb3-go/influxdb3"
"github.com/InfluxCommunity/influxdb3-go/influxdb3/batching"
"github.com/InfluxCommunity/influxdb3-go/v1/influxdb3"
"github.com/InfluxCommunity/influxdb3-go/v1/influxdb3/batching"
)

func Example_batcher() {
Expand Down
11 changes: 5 additions & 6 deletions influxdb3/client_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,8 @@ import (
"testing"
"time"

"github.com/InfluxCommunity/influxdb3-go/influxdb3"
"github.com/InfluxCommunity/influxdb3-go/influxdb3/batching"
"github.com/apache/arrow/go/v15/arrow"
"github.com/InfluxCommunity/influxdb3-go/v1/influxdb3"
"github.com/InfluxCommunity/influxdb3-go/v1/influxdb3/batching"
"github.com/influxdata/line-protocol/v2/lineprotocol"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -138,7 +137,7 @@ func TestWriteAndQueryExample(t *testing.T) {
assert.Equal(t, uint64(800), value["uindex"])
assert.Equal(t, true, value["valid"])
assert.Equal(t, "a1", value["text"])
assert.Equal(t, now, value["time"].(arrow.Timestamp).ToTime(arrow.Nanosecond))
assert.Equal(t, now, value["time"])

// row #2

Expand All @@ -151,7 +150,7 @@ func TestWriteAndQueryExample(t *testing.T) {
assert.Equal(t, uint64(150), value["uindex"])
assert.Equal(t, false, value["valid"])
assert.Equal(t, "b1", value["text"])
assert.Equal(t, now.Add(1*time.Second), value["time"].(arrow.Timestamp).ToTime(arrow.Nanosecond))
assert.Equal(t, now.Add(1*time.Second), value["time"])

assert.False(t, iterator.Done())

Expand Down Expand Up @@ -233,7 +232,7 @@ func TestQueryWithParameters(t *testing.T) {
assert.Equal(t, uint64(800), value["uindex"])
assert.Equal(t, true, value["valid"])
assert.Equal(t, "a1", value["text"])
assert.Equal(t, now, value["time"].(arrow.Timestamp).ToTime(arrow.Nanosecond))
assert.Equal(t, now, value["time"])

assert.False(t, iterator.Done())
assert.False(t, iterator.Next())
Expand Down
12 changes: 6 additions & 6 deletions influxdb3/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,42 +74,42 @@ func (c *Client) setQueryClient(flightClient flight.Client) {
// QueryParameters is a type for query parameters.
type QueryParameters = map[string]any

// Query queries data from InfluxDB IOx.
// Query queries data from InfluxDB v3.
// Parameters:
// - ctx: The context.Context to use for the request.
// - query: The query string to execute.
// - options: The optional query options. See QueryOption for available options.
//
// Returns:
// - A custom iterator (*QueryIterator).
// - A result iterator (*QueryIterator).
// - An error, if any.
func (c *Client) Query(ctx context.Context, query string, options ...QueryOption) (*QueryIterator, error) {
return c.query(ctx, query, nil, newQueryOptions(&DefaultQueryOptions, options))
}

// QueryWithParameters queries data from InfluxDB IOx with parameterized query.
// QueryWithParameters queries data from InfluxDB v3 with parameterized query.
// Parameters:
// - ctx: The context.Context to use for the request.
// - query: The query string to execute.
// - parameters: The query parameters.
// - options: The optional query options. See QueryOption for available options.
//
// Returns:
// - A custom iterator (*QueryIterator).
// - A result iterator (*QueryIterator).
// - An error, if any.
func (c *Client) QueryWithParameters(ctx context.Context, query string, parameters QueryParameters,
options ...QueryOption) (*QueryIterator, error) {
return c.query(ctx, query, parameters, newQueryOptions(&DefaultQueryOptions, options))
}

// QueryWithOptions Query data from InfluxDB IOx with query options.
// QueryWithOptions Query data from InfluxDB v3 with query options.
// Parameters:
// - ctx: The context.Context to use for the request.
// - options: Query options (query type, optional database).
// - query: The query string to execute.
//
// Returns:
// - A custom iterator (*QueryIterator) that can also be used to get raw flightsql reader.
// - A result iterator (*QueryIterator).
// - An error, if any.
//
// Deprecated: use Query with variadic QueryOption options.
Expand Down
Loading

0 comments on commit ed5570e

Please sign in to comment.