Skip to content

Commit

Permalink
tests: integration tests for connection-scoped state and context canc… (
Browse files Browse the repository at this point in the history
  • Loading branch information
murfffi authored Jan 18, 2025
1 parent 0264232 commit e7349c7
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 1 deletion.
1 change: 1 addition & 0 deletions driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ func connect(opts *Options) (*isql.Conn, error) {
TLSConfig: tlsConf,
})
} else {
// TODO SocketTimeout, ConnectTimeout Github #34
socket = thrift.NewTSocketConf(addr, &thrift.TConfiguration{})
}

Expand Down
3 changes: 2 additions & 1 deletion internal/hive/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (op *Operation) WaitToFinish(ctx context.Context) error {
for err == nil && opState != cli_service.TOperationState_FINISHED_STATE {
sleep(ctx, duration)
opState, err = op.CheckStateAndStatus(ctx)
// GetState should have returned an error if ctx.Err() but just in case
// It is important to check ctx.Err() as Thrift almost always ignores context - at least up to v0.21.
err = lo.CoalesceOrEmpty(err, ctx.Err())
duration = nextDuration(duration)
}
Expand All @@ -134,6 +134,7 @@ func fetch(ctx context.Context, op *Operation) (*cli_service.TFetchResultsResp,
var duration time.Duration
fetchStatus := cli_service.TStatusCode_STILL_EXECUTING_STATUS
resp := &cli_service.TFetchResultsResp{}
// It is important to check ctx.Err() as Thrift almost always ignores context - at least up to v0.21.
for fetchStatus == cli_service.TStatusCode_STILL_EXECUTING_STATUS && ctx.Err() == nil {
// It is questionable if we need to back-off (sleep) in this case
// impala-shell doesn't - https://github.com/apache/impala/blob/1f35747/shell/impala_client.py#L958
Expand Down
44 changes: 44 additions & 0 deletions internal/isql/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,9 @@ func runHappyCases(t *testing.T, db *sql.DB) {
t.Run("Insert", func(t *testing.T) {
testInsert(t, db)
})
t.Run("Connection State", func(t *testing.T) {
testConnState(t, db)
})
}

func runErrorCases(t *testing.T, db *sql.DB) {
Expand All @@ -143,6 +146,47 @@ func runErrorCases(t *testing.T, db *sql.DB) {
_, err = db.Exec("CREATE TABLE test(a int) LOCATION '/some/location'")
require.ErrorContains(t, err, "ImpalaRuntimeException")
})

t.Run("Context Cancelled", func(t *testing.T) {

startTime := time.Now()
bkgCtx := context.Background()
conn, err := db.Conn(bkgCtx)
require.NoError(t, err)
defer fi.NoErrorF(conn.Close, t)
_, err = conn.ExecContext(bkgCtx, `SET FETCH_ROWS_TIMEOUT_MS="500"`)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(bkgCtx, 1*time.Second)
defer cancel()
row := conn.QueryRowContext(ctx, "SELECT SLEEP(10000)")
var val any
err = row.Scan(&val)
require.NoError(t, row.Err())
require.ErrorIs(t, err, context.DeadlineExceeded)
require.Less(t, time.Since(startTime), 5*time.Second)

})
}

// testConnState verifies that the connection created db.Conn matches 1:1
// to an Impala connection so connection-scoped state persists across calls
func testConnState(t *testing.T, db *sql.DB) {
var err error
_, err = db.Exec("CREATE DATABASE IF NOT EXISTS foo")
require.NoError(t, err)
_, err = db.Exec("DROP TABLE IF EXISTS foo.bar")
require.NoError(t, err)
_, err = db.Exec("CREATE TABLE foo.bar(a int)")
require.NoError(t, err)
ctx := context.Background()
conn, err := db.Conn(ctx)
require.NoError(t, err)
defer fi.NoErrorF(conn.Close, t)
_, err = conn.ExecContext(ctx, "USE foo")
require.NoError(t, err)
res, err := conn.QueryContext(ctx, "SELECT * FROM bar")
require.NoError(t, err)
require.NoError(t, res.Close())
}

func runImpala4SpecificTests(t *testing.T, dsn string) {
Expand Down

0 comments on commit e7349c7

Please sign in to comment.