Skip to content

Commit

Permalink
Merge pull request #17 from bippio/timeout-option
Browse files Browse the repository at this point in the history
Timeout option
  • Loading branch information
vishjosh authored May 26, 2019
2 parents 3bf1ead + 3e53a96 commit 0b96a44
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 29 deletions.
48 changes: 28 additions & 20 deletions cmd/impala/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func main() {
flag.StringVar(&opts.CACertPath, "ca-cert", "", "ca certificate path")
flag.IntVar(&opts.BatchSize, "batch-size", 1024, "fetch batch size")
flag.StringVar(&opts.MemoryLimit, "mem-limit", "0", "memory limit")
flag.IntVar(&opts.QueryTimeout, "query-timeout", 0, "query timeout (in seconds)")
flag.IntVar(&timeout, "timeout", 0, "timeout in ms; set 0 to disable timeout")
flag.BoolVar(&verbose, "v", false, "verbose")
flag.Parse()
Expand All @@ -55,11 +56,6 @@ func main() {
opts.LogOut = os.Stderr
}

connector := impala.NewConnector(&opts)

db := sql.OpenDB(connector)
defer db.Close()

appctx, cancel := context.WithCancel(context.Background())
sig := make(chan os.Signal, 1)
signal.Notify(sig, os.Interrupt, syscall.SIGTERM)
Expand All @@ -71,8 +67,11 @@ func main() {
}
}()

if err := db.PingContext(appctx); err != nil {
log.Fatal(err)
if timeout != 0 {
ctx, release := context.WithTimeout(appctx, time.Duration(timeout*int(time.Millisecond)))
defer release()

appctx = ctx
}

var q string
Expand Down Expand Up @@ -100,28 +99,35 @@ func main() {
q = line
}

if timeout != 0 {
ctx, release := context.WithTimeout(appctx, time.Duration(timeout*int(time.Millisecond)))
defer release()

appctx = ctx
if err := query(appctx, &opts, q); err != nil {
log.Fatal(err)
}

query(appctx, db, q)
//exec(appctx, db, q)
}

func query(ctx context.Context, db *sql.DB, query string) {
func query(ctx context.Context, opts *impala.Options, query string) error {
connector := impala.NewConnector(opts)

db := sql.OpenDB(connector)

defer func () {
db.Close()
}()

if err := db.PingContext(ctx); err != nil {
return err
}

startTime := time.Now()

rows, err := db.QueryContext(ctx, query)
if err != nil {
log.Fatal(err)
return err
}

coltypes, err := rows.ColumnTypes()
if err != nil {
log.Fatal(err)
return err
}

in := make([]reflect.Value, len(coltypes))
Expand Down Expand Up @@ -153,17 +159,19 @@ func query(ctx context.Context, db *sql.DB, query string) {
results++
}
if err := rows.Err(); err != nil {
log.Fatal(err)
return err
}
fmt.Printf("Fetch %d rows(s) in %.2fs\n", results, time.Duration(time.Since(startTime)).Seconds())
return nil
}

func exec(ctx context.Context, db *sql.DB, query string) {
func exec(ctx context.Context, db *sql.DB, query string) error {
res, err := db.ExecContext(ctx, query)
if err != nil {
log.Fatal(err)
return err
}

log.Print(res)
fmt.Print("The operation has no results.\n")
return nil
}
10 changes: 10 additions & 0 deletions driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,15 @@ func parseURI(uri string) (*Options, error) {
opts.MemoryLimit = memLimit[0]
}

queryTimeout, ok := query["query-timeout"]
if ok {
qTimeout, err := strconv.Atoi(queryTimeout[0])
if err != nil {
return nil, err
}
opts.QueryTimeout = qTimeout
}

return &opts, nil
}

Expand Down Expand Up @@ -217,6 +226,7 @@ func connect(opts *Options) (*Conn, error) {
client := hive.NewClient(tclient, logger, &hive.Options{
MaxRows: int64(opts.BatchSize),
MemLimit: opts.MemoryLimit,
QueryTimeout: opts.QueryTimeout,
})

return &Conn{client: client, t: transport, log: logger}, nil
Expand Down
9 changes: 6 additions & 3 deletions hive/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package hive
import (
"context"
"log"
"strconv"

"github.com/apache/thrift/lib/go/thrift"
"github.com/bippio/go-impala/services/cli_service"
Expand All @@ -17,8 +18,9 @@ type Client struct {

// Options for Hive Client
type Options struct {
MaxRows int64
MemLimit string
MaxRows int64
MemLimit string
QueryTimeout int
}

// NewClient creates Hive Client
Expand All @@ -34,7 +36,8 @@ func NewClient(client thrift.TClient, log *log.Logger, opts *Options) *Client {
func (c *Client) OpenSession(ctx context.Context) (*Session, error) {

cfg := map[string]string{
"MEM_LIMIT": c.opts.MemLimit,
"MEM_LIMIT": c.opts.MemLimit,
"QUERY_TIMEOUT_S": strconv.Itoa(c.opts.QueryTimeout),
}

req := cli_service.TOpenSessionReq{
Expand Down
13 changes: 7 additions & 6 deletions impala.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ type Options struct {
Username string
Password string

UseLDAP bool
UseTLS bool
CACertPath string
BufferSize int
BatchSize int
MemoryLimit string
UseLDAP bool
UseTLS bool
CACertPath string
BufferSize int
BatchSize int
MemoryLimit string
QueryTimeout int

LogOut io.Writer
}
Expand Down

0 comments on commit 0b96a44

Please sign in to comment.