forked from bippio/go-impala
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmetadata.go
99 lines (86 loc) · 3 KB
/
metadata.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package impala
import (
"context"
"database/sql"
"errors"
"iter"
"slices"
"github.com/sclgo/impala-go/internal/hive"
"github.com/sclgo/impala-go/internal/isql"
)
// Metadata exposes the schema and other metadata in an Impala instance
type Metadata struct {
db *sql.DB
conn ConnRawAccess
}
// ConnRawAccess exposes the Raw method of sql.Conn
type ConnRawAccess interface {
Raw(func(driverConn any) error) error
}
// TableName contains all attributes that identify a table
type TableName = hive.TableName
// It is questionable if it is appropriate to have a type alias to internal package
// in a public package. Will change if it becomes an issue.
// NewMetadata creates Metadata instance with the given Impala DB as data source. A new connection
// will be retrieved for any call. If that's an issue, use NewMetadataFromConn
func NewMetadata(db *sql.DB) *Metadata {
return &Metadata{db: db}
}
// NewMetadataFromConn creates Metadata instance with the given Impala connection as data source
// *sql.Conn implements ConnRawAccess
func NewMetadataFromConn(conn ConnRawAccess) *Metadata {
return &Metadata{conn: conn}
}
// GetTables retrieves tables and views that match the provided LIKE patterns
func (m Metadata) GetTables(ctx context.Context, schemaPattern string, tableNamePattern string) ([]TableName, error) {
return raw(ctx, m.db, m.conn, func(dbm hive.DBMetadata) (iter.Seq[hive.TableName], *error) {
return dbm.GetTablesSeq(ctx, schemaPattern, tableNamePattern)
})
}
// GetSchemas retrieves schemas that match the provided LIKE pattern
func (m Metadata) GetSchemas(ctx context.Context, schemaPattern string) ([]string, error) {
return raw(ctx, m.db, m.conn, func(dbm hive.DBMetadata) (iter.Seq[string], *error) {
return dbm.GetSchemasSeq(ctx, schemaPattern)
})
}
// raw executes the given sequence-producing function over a HiveSession derived from a raw connection produced by db
func raw[T any](ctx context.Context, db *sql.DB, dbconn ConnRawAccess, f func(hive.DBMetadata) (iter.Seq[T], *error)) ([]T, error) {
var res []T
var err error
if dbconn == nil {
var conn *sql.Conn
conn, err = db.Conn(ctx)
if err != nil {
return res, err
}
defer func() {
err = conn.Close()
}()
dbconn = conn
}
res, err = execOnRaw(ctx, dbconn, f)
return res, err // err may be overwritten in defer
}
func execOnRaw[T any](ctx context.Context, conn ConnRawAccess, f func(hive.DBMetadata) (iter.Seq[T], *error)) ([]T, error) {
var res []T
err := conn.Raw(func(driverConn any) error {
impalaConn, ok := driverConn.(*isql.Conn)
if !ok {
return errors.New("metadata can operate only on Impala drivers")
}
session, sessionErr := impalaConn.OpenSession(ctx)
if sessionErr != nil {
return sessionErr
}
dbm := session.DBMetadata()
resIter, funcErr := f(dbm)
if *funcErr != nil {
return *funcErr
}
// driverConn might not be valid outside this method so we can't return anything
// that depends on it like the iterator itself
res = slices.Collect(resIter)
return *funcErr
})
return res, err
}