Skip to content

Commit

Permalink
feat: WIP - starting batch interface writes - feature request #62
Browse files Browse the repository at this point in the history
  • Loading branch information
karel-rehor committed Feb 23, 2024
1 parent d87f91b commit eba1487
Show file tree
Hide file tree
Showing 2 changed files with 193 additions and 0 deletions.
79 changes: 79 additions & 0 deletions influxdb3/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,85 @@ func (c *Client) WriteDataWithOptions(ctx context.Context, options *WriteOptions
return c.WriteWithOptions(ctx, options, buff)
}

// WriteBatchDataWithOptions writes batch data from an array of interfaces
//
// TODO comment me - placeholder for linter
//
func (c *Client) WriteBatchDataWithOptions(ctx context.Context,
options *WriteOptions,
ifaces interface{}) error {

t := reflect.TypeOf(ifaces)
va := reflect.ValueOf(ifaces)

/* DEBUG BLOCK
arr := make([]any, 0)
fmt.Printf("DEBUG typeof ifaces: %+v\n", reflect.TypeOf(ifaces))
fmt.Printf("DEBUG kind of ifaces: %+v\n", t.Kind())
fmt.Printf("DEBUG is array %t\n", t.Kind() == reflect.Array || t.Kind() == reflect.Slice)
fmt.Printf("DEBUG CanConvert([]array) %t\n", va.CanConvert(reflect.TypeOf(arr))) */

if !(t.Kind() == reflect.Array || t.Kind() == reflect.Slice) {
return fmt.Errorf("The type %s is unsupported. "+
"This method handles only slices and arrays of annotated structures.\n", t.Kind())
}

/* DEBUG BLOCK
elemK := va.Index(0).Kind()
elemV := va.Index(0)
elemT := va.Index(0).Type()
if elemK == reflect.Ptr {
fmt.Printf("DEBUG got pointer\n")
elemV = elemV.Elem()
elemT = elemT.Elem()
}
fmt.Printf("DEBUG elemK: %+v\n", elemK)
fmt.Printf("DEBUG elemV: %+v\n", elemV)
fmt.Printf("DEBUG elemT: %+v\n", elemT)
fmt.Printf("DEBUG fields elemT: %+v\n", reflect.VisibleFields(elemT))
fields := reflect.VisibleFields(elemT)
for _, f := range fields {
fmt.Printf(" DEBUG has field %s with tag %+v\n", f.Name, f.Tag)
}
*/

if va.Len() < 1 {
return nil
}
/* DEBUG BLOCK */
for i := 0; i < va.Len(); i++ {
fmt.Printf("DEBUG [%d] type: %s, kind: %s, %+v\n", i,
va.Index(i).Type(),
va.Index(i).Kind(),
va.Index(i))
}

/* */
var buff []byte

for i := 0; i < va.Len(); i++ {
fmt.Printf("DEBUG encoding iface[%d]: %+v\n", i, va.Index(i))
byts, err := encode(va.Index(i).Interface(), options)
if err != nil {
return fmt.Errorf("error encoding point: %w", err)
//panic(fmt.Sprintf("error encoding point: %s", err.Error()))
}
fmt.Printf("DEBUG appending byts: %+v\n", byts)
buff = append(buff, byts...)
}
fmt.Printf("DEBUG ctx: %+v\n", ctx)
fmt.Printf("DEBUG buff: %+v\n", buff)
//return nil
return c.WriteWithOptions(ctx, options, buff) // -- todo uncomment after basic tests in place
}

func encode(x interface{}, options *WriteOptions) ([]byte, error) {
if err := checkContainerType(x, false, "point"); err != nil {
return nil, err
Expand Down
114 changes: 114 additions & 0 deletions influxdb3/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"context"
"fmt"
"io"
"math"
"math/rand"
"net/http"
"net/http/httptest"
Expand Down Expand Up @@ -674,3 +675,116 @@ func TestWriteWithOptionsNotSet(t *testing.T) {
assert.Error(t, err)
assert.EqualError(t, err, "options not set")
}

type Widget struct {
Class string `lp:"measurement"`
Model string `lp:"tag,model"`
Name string `lp:"field,name"`
Charge float64 `lp:"field,charge"`
Quanta int `lp:"field,quanta"`
// Time time.Time `lp:"timestamp"`
}

func TestClient_WriteBatchDataPtrWithOptionsPtr(t *testing.T) {
widgets := []*Widget{
&Widget{
"widget",
"A",
"able",
math.E,
3,
// time.Now(),
},
&Widget{
"widget",
"A",
"baker",
math.Pi * -2,
1,
// time.Now().Add(-10 * time.Second),
},
&Widget{
"widget",
"T",
"charlie",
math.Pi,
1,
// time.Now().Add(-20 * time.Second),
},
}
correctPath := "/api/v2/write?bucket=x-db&org=my-org&precision=s"
lp := `widget,defaultTag=default,model=A charge=2.718281828459045,name="able",quanta=3i
widget,defaultTag=default,model=A charge=-6.283185307179586,name="baker",quanta=1i
widget,defaultTag=default,model=T charge=3.141592653589793,name="charlie",quanta=1i
`
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// initialization of query client
if r.Method == "PRI" {
return
}
assert.EqualValues(t, correctPath, r.URL.String())
body, err := io.ReadAll(r.Body)
require.NoError(t, err)
assert.Equal(t, lp, string(body))
w.WriteHeader(204)
}))
defer ts.Close()
c, err := New(ClientConfig{
Host: ts.URL,
Token: "my-token",
Organization: "my-org",
Database: "my-database",
})
options := WriteOptions{
Database: "x-db",
Precision: lineprotocol.Second,
DefaultTags: map[string]string{
"defaultTag": "default",
},
}
require.NoError(t, err)
werr := c.WriteBatchDataWithOptions(context.Background(), &options, widgets)
if werr != nil {
fmt.Printf("Caught werr: %s\n", werr.Error())
}
//berr := c.WriteBatchDataWithOptions(context.Background(), &options, "A String")
//assert.NotNil(t, berr)
//fmt.Printf("Got berr: %s\n", berr.Error())
}

func TestClient_WriteBatchDataWithOptionsBadData(t *testing.T) {
options := WriteOptions{
Database: "x-db",
Precision: lineprotocol.Second,
DefaultTags: map[string]string{
"defaultTag": "default",
},
}
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// initialization of query client
if r.Method == "PRI" {
return
}
correctPath := "/api/v2/write?bucket=x-db&org=my-org&precision=s"
assert.EqualValues(t, correctPath, r.URL.String())
//body, err := io.ReadAll(r.Body)
//require.NoError(t, err)
//assert.Equal(t, lp, string(body))
w.WriteHeader(204)
}))
defer ts.Close()
c, err := New(ClientConfig{
Host: ts.URL,
Token: "my-token",
Organization: "my-org",
Database: "my-database",
})

if err != nil {
assert.Failf(t, "Failed to get client: %s\n", err.Error())
}

berr := c.WriteBatchDataWithOptions(context.Background(), &options, "A String")
assert.NotNil(t, berr)
fmt.Printf("Got berr: %s\n", berr.Error())
}

0 comments on commit eba1487

Please sign in to comment.