Skip to content

Commit

Permalink
Merge pull request #2 from btnguyen2k/dev
Browse files Browse the repository at this point in the history
Prepare to release v0.2.0
  • Loading branch information
btnguyen2k authored May 31, 2023
2 parents 22b773c + cb5c938 commit cc79d9f
Show file tree
Hide file tree
Showing 13 changed files with 920 additions and 405 deletions.
38 changes: 38 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,44 @@ func main() {
- `UPDATE`
- `DELETE`

## Transaction support

`godynamo` supports transactions that consist of write statements (e.g. `INSERT`, `UPDATE` and `DELETE`) since [v0.2.0](RELEASE-NOTES.md). Please note the following:

- Any limitation set by [DynamoDB/PartiQL](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/ql-reference.multiplestatements.transactions.html) will apply.
- [Table](SQL_TABLE.md) and [Index](SQL_INDEX.md) statements are not supported.
- `UPDATE`/`DELETE` with `RETURNING` and `SELECT` statements are not supported.

Example:
```go
tx, err := db.Begin()
if err != nil {
panic(err)
}
defer tx.Rollback()
result1, _ := tx.Exec(`INSERT INTO "tbltest" VALUE {'app': ?, 'user': ?, 'active': ?}`, "app0", "user1", true)
result2, _ := tx.Exec(`INSERT INTO "tbltest" VALUE {'app': ?, 'user': ?, 'duration': ?}`, "app0", "user2", 1.23)
err = tx.Commit()
if err != nil {
panic(err)
}
rowsAffected1, err1 := fmt.Println(result1.RowsAffected())
if err1 != nil {
panic(err1)
}
fmt.Println("RowsAffected:", rowsAffected1) // output "RowsAffected: 1"

rowsAffected2, err2 := fmt.Println(result2.RowsAffected())
if err2 != nil {
panic(err2)
}
fmt.Println("RowsAffected:", rowsAffected2) // output "RowsAffected: 1"
```

> If a statement's condition check fails (e.g. deleting non-existing item), the whole transaction will also fail. This behaviour is different from executing statements in non-transactional mode where failed condition check results in `0` affected row without error.
>
> You can use [`EXISTS` function](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/ql-functions.exists.html) for condition checking.
## License

MIT - See [LICENSE.md](LICENSE.md).
4 changes: 4 additions & 0 deletions RELEASE-NOTES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# godynamo release notes

## 2023-05-31 - v0.2.0

- Add transation support.

## 2023-05-27 - v0.1.0

- Driver for `database/sql`, supported statements:
Expand Down
8 changes: 8 additions & 0 deletions SQL_DOCUMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ Sample result:
|--------|
|"AU" |

> If there is no matched item, the error `ConditionalCheckFailedException` is suspended. That means:
> - `RowsAffected()` returns `(0, nil)`
> - `Query` returns empty result set.
## DELETE

Syntax: [PartiQL delete statements for DynamoDB](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/ql-reference.delete.html)
Expand Down Expand Up @@ -103,3 +107,7 @@ Sample result:
|app|location|platform|user|
|---|--------|--------|----|
|"app0"|"AU"|"Windows"|"user1"|

> If there is no matched item, the error `ConditionalCheckFailedException` is suspended. That means:
> - `RowsAffected()` returns `(0, nil)`
> - `Query` returns empty result set.
151 changes: 139 additions & 12 deletions conn.go
Original file line number Diff line number Diff line change
@@ -1,34 +1,161 @@
package godynamo

import (
"context"
"database/sql/driver"
"errors"
"fmt"
"time"

"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
)

// Conn is AWS DynamoDB connection handler.
var (
ErrInTx = errors.New("statement is in the middle of a transaction, result is not available until committed")
ErrInvalidTxStage = errors.New("invalid transaction stage, cannot execute statement ")
)

type txMode int

const (
txNone txMode = iota
txStarted
txCommitting
)

type txStmt struct {
stmt *Stmt
values []driver.NamedValue
output *dynamodb.ExecuteStatementOutput
}

type executeStatementOutputWrapper func() *dynamodb.ExecuteStatementOutput

// Conn is AWS DynamoDB implementation of driver.Conn.
type Conn struct {
client *dynamodb.Client //AWS DynamoDB client
client *dynamodb.Client // AWS DynamoDB client
timeout time.Duration
txMode txMode
txStmtList []*txStmt
}

// Close implements driver.Conn.Close.
func (c *Conn) Close() error {
return nil
func (c *Conn) ensureContext(ctx context.Context) context.Context {
if ctx == nil {
ctx, _ = context.WithTimeout(context.Background(), c.timeout)
}
return ctx
}

// Begin implements driver.Conn.Begin.
func (c *Conn) Begin() (driver.Tx, error) {
return nil, errors.New("transaction is not supported")
func (c *Conn) commit() error {
defer func() {
c.txMode = txNone
c.txStmtList = nil
}()
c.txMode = txCommitting
txStmts := make([]types.ParameterizedStatement, len(c.txStmtList))
for i, txStmt := range c.txStmtList {
params := make([]types.AttributeValue, len(txStmt.values))
var err error
for j, v := range txStmt.values {
params[j], err = ToAttributeValue(v.Value)
if err != nil {
return fmt.Errorf("error marshalling parameter %d-th for statement <%s>: %s", j+1, txStmt.stmt.query, err)
}
}
txStmts[i] = types.ParameterizedStatement{Statement: &txStmt.stmt.query, Parameters: params}
}
input := &dynamodb.ExecuteTransactionInput{
TransactStatements: txStmts,
ReturnConsumedCapacity: types.ReturnConsumedCapacityTotal,
}
outputExecuteTransaction, err := c.client.ExecuteTransaction(c.ensureContext(nil), input)
if err == nil {
for i, txStmt := range c.txStmtList {
txStmt.output = &dynamodb.ExecuteStatementOutput{ResultMetadata: outputExecuteTransaction.ResultMetadata}
if len(outputExecuteTransaction.ConsumedCapacity) > i {
txStmt.output.ConsumedCapacity = &outputExecuteTransaction.ConsumedCapacity[i]
}
if len(outputExecuteTransaction.Responses) > i {
txStmt.output.Items = []map[string]types.AttributeValue{outputExecuteTransaction.Responses[i].Item}
}
}
}
return err
}

// CheckNamedValue implements driver.NamedValueChecker.CheckNamedValue.
func (c *Conn) CheckNamedValue(value *driver.NamedValue) error {
// since DynamoDB is document db, it accepts any value types
func (c *Conn) rollback() error {
defer func() {
c.txMode = txNone
c.txStmtList = nil
}()
return nil
}

// Prepare implements driver.Conn.Prepare.
// execute executes a PartiQL query and returns the result output.
func (c *Conn) executeContext(ctx context.Context, stmt *Stmt, values []driver.NamedValue) (executeStatementOutputWrapper, error) {
if c.txMode == txStarted {
txStmt := txStmt{stmt: stmt, values: values}
c.txStmtList = append(c.txStmtList, &txStmt)
return func() *dynamodb.ExecuteStatementOutput {
return txStmt.output
}, ErrInTx
}
if c.txMode != txNone {
return nil, ErrInvalidTxStage
}
params := make([]types.AttributeValue, len(values))
var err error
for i, v := range values {
params[i], err = ToAttributeValue(v.Value)
if err != nil {
return nil, fmt.Errorf("error marshalling parameter %d-th: %s", i+1, err)
}
}
input := &dynamodb.ExecuteStatementInput{
Statement: &stmt.query,
ReturnConsumedCapacity: types.ReturnConsumedCapacityTotal,
Parameters: params,
}
output, err := c.client.ExecuteStatement(c.ensureContext(ctx), input)
return func() *dynamodb.ExecuteStatementOutput {
return output
}, err
}

// Prepare implements driver.Conn/Prepare.
func (c *Conn) Prepare(query string) (driver.Stmt, error) {
return c.PrepareContext(context.Background(), query)
}

// PrepareContext implements driver.ConnPrepareContext/PrepareContext.
//
// @Available since v0.2.0
func (c *Conn) PrepareContext(_ context.Context, query string) (driver.Stmt, error) {
return parseQuery(c, query)
}

// Close implements driver.Conn/Close.
func (c *Conn) Close() error {
return nil
}

// Begin implements driver.Conn/Begin.
func (c *Conn) Begin() (driver.Tx, error) {
return c.BeginTx(context.Background(), driver.TxOptions{})
}

// BeginTx implements driver.ConnBeginTx/BeginTx.
//
// @Available since v0.2.0
func (c *Conn) BeginTx(_ context.Context, _ driver.TxOptions) (driver.Tx, error) {
c.txMode = txStarted
c.txStmtList = make([]*txStmt, 0)
return &Tx{conn: c}, nil
}

// CheckNamedValue implements driver.NamedValueChecker/CheckNamedValue.
func (c *Conn) CheckNamedValue(_ *driver.NamedValue) error {
// since DynamoDB is document db, it accepts any value types
return nil
}
22 changes: 12 additions & 10 deletions driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,17 @@ func init() {
sql.Register("godynamo", &Driver{})
}

// Driver is AWS DynamoDB driver for database/sql.
// Driver is AWS DynamoDB implementation of driver.Driver.
type Driver struct {
}

// Open implements driver.Driver.Open.
// Open implements driver.Driver/Open.
//
// connStr is expected in the following format:
//
// Region=<region>;AkId=<aws-key-id>;Secret_Key=<aws-secret-key>[;Endpoint=<dynamodb-endpoint>]
// Region=<region>;AkId=<aws-key-id>;Secret_Key=<aws-secret-key>[;Endpoint=<dynamodb-endpoint>][;TimeoutMs=<timeout-in-milliseconds>]
//
// If not supplied, default value for TimeoutMs is 10 seconds, Version is defaultApiVersion (which is "2018-12-31"), AutoId is true, and InsecureSkipVerify is false
//
// - DefaultDb is added since v0.1.1
// - AutoId is added since v0.1.2
// - InsecureSkipVerify is added since v0.1.4
// If not supplied, default value for TimeoutMs is 10 seconds.
func (d *Driver) Open(connStr string) (driver.Conn, error) {
params := make(map[string]string)
parts := strings.Split(connStr, ";")
Expand All @@ -57,12 +53,18 @@ func (d *Driver) Open(connStr string) (driver.Conn, error) {
akid := params["AKID"]
if akid == "" {
akid = os.Getenv("AWS_ACCESS_KEY_ID")
if akid == "" {
akid = os.Getenv("AWS_AKID")
}
}
secretKey := params["SECRET_KEY"]
if secretKey == "" {
secretKey = params["SECRETKEY"]
if secretKey == "" {
secretKey = os.Getenv("AWS_SECRET_ACCESS_KEY")
secretKey = os.Getenv("AWS_SECRET_KEY")
if secretKey == "" {
secretKey = os.Getenv("AWS_SECRET_ACCESS_KEY")
}
}
}
opts := dynamodb.Options{
Expand All @@ -81,5 +83,5 @@ func (d *Driver) Open(connStr string) (driver.Conn, error) {
}
}
client := dynamodb.New(opts)
return &Conn{client: client}, nil
return &Conn{client: client, timeout: time.Duration(timeoutMs) * time.Millisecond}, nil
}
27 changes: 26 additions & 1 deletion godynamo.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
package godynamo

import (
"database/sql/driver"
"reflect"
"strconv"

"github.com/aws/aws-sdk-go-v2/aws/transport/http"
"github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue"
Expand All @@ -12,7 +14,7 @@ import (

const (
// Version of package godynamo.
Version = "0.1.0"
Version = "0.2.0"
)

var (
Expand Down Expand Up @@ -46,6 +48,29 @@ func IsAwsError(err error, awsErrCode string) bool {
return false
}

// ValuesToNamedValues transforms a []driver.Value to []driver.NamedValue.
//
// @Available since v0.2.0
func ValuesToNamedValues(values []driver.Value) []driver.NamedValue {
result := make([]driver.NamedValue, len(values))
for i, v := range values {
result[i] = driver.NamedValue{Name: "$" + strconv.Itoa(i+1), Ordinal: i, Value: v}
}
return result
}

// // NamedValuesToValues transforms a []driver.NamedValue to []driver.Value.
// //
// // @Available since v0.2.0
// func NamedValuesToValues(values []driver.NamedValue) []driver.Value {
// result := make([]driver.Value, len(values))
// for i, v := range values {
// result[i] = v.Value
// }
// return result
// }

// ToAttributeValue marshals a Go value to AWS AttributeValue.
func ToAttributeValue(value interface{}) (types.AttributeValue, error) {
if av, ok := value.(types.AttributeValue); ok {
return av, nil
Expand Down
20 changes: 10 additions & 10 deletions godynamo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,16 @@ func TestDriver_Conn(t *testing.T) {
defer conn.Close()
}

func TestDriver_Transaction(t *testing.T) {
testName := "TestDriver_Transaction"
db := _openDb(t, testName)
defer db.Close()
if tx, err := db.BeginTx(context.Background(), nil); tx != nil || err == nil {
t.Fatalf("%s failed: transaction is not supported yet", testName)
} else if strings.Index(err.Error(), "not supported") < 0 {
t.Fatalf("%s failed: transaction is not supported yet / %s", testName, err)
}
}
// func TestDriver_Transaction(t *testing.T) {
// testName := "TestDriver_Transaction"
// db := _openDb(t, testName)
// defer db.Close()
// if tx, err := db.BeginTx(context.Background(), nil); tx != nil || err == nil {
// t.Fatalf("%s failed: transaction is not supported yet", testName)
// } else if strings.Index(err.Error(), "not supported") < 0 {
// t.Fatalf("%s failed: transaction is not supported yet / %s", testName, err)
// }
// }

func TestDriver_Open(t *testing.T) {
testName := "TestDriver_Open"
Expand Down
Loading

0 comments on commit cc79d9f

Please sign in to comment.