Rollback does not work well with Go language transactional wrapper

Issue

I have recently started learning Go.

I found the following Github implementation of a wrapper for database transaction processing and decided to try it out.

(source) https://github.com/oreilly-japan/practical-go-programming/blob/master/ch09/transaction/wrapper/main.go

I am using PostgreSQL as the database.

Initially, it contains the following data.

testdb=> select * from products;
 product_id | price
------------+-------
 0001       |   200
 0002       |   100
 0003       |   150
 0004       |   300
(4 rows)

After Process A succeeds, Process B is intentionally made to fail, and a rollback of transaction A is expected. However, when we run it, the rollback does not occur and we end up with the following

In truth, since B failed, the process A should be rolled back and there should be no change in the database value.

I have inserted Logs in places to confirm this, but I am not sure. Why is the rollback not executed?

package main

import (
    "context"
    "database/sql"
    "fmt"
    "log"

    _ "github.com/jackc/pgx/v4/stdlib"
)

// transaction-wrapper-start
type txAdmin struct {
    *sql.DB
}

type Service struct {
    tx txAdmin
}

func (t *txAdmin) Transaction(ctx context.Context, f func(ctx context.Context) (err error)) error {
    log.Printf("transaction")
    tx, err := t.BeginTx(ctx, nil)
    if err != nil {
        return err
    }
    defer tx.Rollback()
    if err := f(ctx); err != nil {
        log.Printf("transaction err")
        return fmt.Errorf("transaction query failed: %w", err)
    }

    log.Printf("commit")
    return tx.Commit()
}

func (s *Service) UpdateProduct(ctx context.Context, productID string) error {
    updateFunc := func(ctx context.Context) error {
        log.Printf("first process")
        // Process A
        if _, err := s.tx.ExecContext(ctx, "UPDATE products SET price = 200 WHERE product_id = $1", productID); err != nil {
            log.Printf("first err")
            return err
        }
        log.Printf("second process")

        // Process B(They are intentionally failing.)
        if _, err := s.tx.ExecContext(ctx, "...", productID); err != nil {
            log.Printf("second err")
            return err
        }
        return nil
    }
    log.Printf("update")
    return s.tx.Transaction(ctx, updateFunc)
}

// transaction-wrapper-end
func main() {

    data, err := sql.Open("pgx", "host=localhost port=5432 user=testuser dbname=testdb password=password sslmode=disable")
    if nil != err {
        log.Fatal(err)
    }

    database := Service {tx: txAdmin{data}}

    ctx := context.Background()

    database.UpdateProduct(ctx, "0004")
}

output

2022/05/26 13:28:55 update     
2022/05/26 13:28:55 transaction
2022/05/26 13:28:55 first process
2022/05/26 13:28:55 second process
2022/05/26 13:28:55 second err
2022/05/26 13:28:55 transaction err

database changes(If the rollback works, the PRICE for id 0004 should remain 300.)

testdb=> select * from products;
 product_id | price
------------+-------
 0001       |   200
 0002       |   100
 0003       |   150
 0004       |   200
(4 rows)

Please tell me how I can use the wrapper to correctly process transactions.

=========
PS.
The following code without the wrapper worked properly.

package main

import (
    "context"
    "database/sql"
    "log"

    _ "github.com/jackc/pgx/v4/stdlib"
)

// transaction-defer-start
type Service struct {
    db *sql.DB
}

func (s *Service) UpdateProduct(ctx context.Context, productID string) (err error) {
    tx, err := s.db.Begin()
    if err != nil {
        return err
    }
    defer tx.Rollback()

    if _, err = tx.ExecContext(ctx, "UPDATE products SET price = 200 WHERE product_id = $1", productID); err != nil {
        log.Println("update err")
        return err
    }

    if _, err = tx.ExecContext(ctx, "...", productID); err != nil {
        log.Println("update err")
        return err
    }

    return tx.Commit()
}

// transaction-defer-end
func main() {
    var database Service
    dbConn, err := sql.Open("pgx", "host=localhost port=5432 user=testuser dbname=testdb password=passs sslmode=disable")
    if nil != err {
        log.Fatal(err)
    }
    database.db = dbConn

    ctx := context.Background()

    database.UpdateProduct(ctx, "0004")
    
}

Solution

As @Richard Huxton said, pass tx into a function f

here are the steps:

  1. add one field on struct txAdmin to accommodate *sql.Tx, so txAdmin have DB and Tx fields
  2. inside Transaction set tx to *txAdmin.Tx
  3. inside UpdateProduct use *Service.tx.Tx for every query

so the final code looks like this:

package main

import (
    "context"
    "database/sql"
    "fmt"
    "log"

    _ "github.com/jackc/pgx/v4/stdlib"
)

// transaction-wrapper-start
type txAdmin struct {
    *sql.DB
    *sql.Tx
}

type Service struct {
    tx txAdmin
}

func (t *txAdmin) Transaction(ctx context.Context, f func(ctx context.Context) (err error)) error {
    log.Printf("transaction")
    tx, err := t.DB.BeginTx(ctx, nil)
    if err != nil {
        return err
    }

    // set tx to Tx
    t.Tx = tx

    defer tx.Rollback()
    if err := f(ctx); err != nil {
        log.Printf("transaction err")
        return fmt.Errorf("transaction query failed: %w", err)
    }

    log.Printf("commit")
    return tx.Commit()
}

func (s *Service) UpdateProduct(ctx context.Context, productID string) error {
    updateFunc := func(ctx context.Context) error {
        log.Printf("first process")
        // Process A
        if _, err := s.tx.Tx.ExecContext(ctx, "UPDATE products SET price = 200 WHERE product_id = $1", productID); err != nil {
            log.Printf("first err")
            return err
        }
        log.Printf("second process")

        // Process B(They are intentionally failing.)
        if _, err := s.tx.Tx.ExecContext(ctx, "...", productID); err != nil {
            log.Printf("second err")
            return err
        }
        return nil
    }
    log.Printf("update")
    return s.tx.Transaction(ctx, updateFunc)
}

// transaction-wrapper-end
func main() {
    data, err := sql.Open("pgx", "host=localhost port=5432 user=testuser dbname=testdb password=password sslmode=disable")
    if nil != err {
        log.Fatal(err)
    }

    database := Service{tx: txAdmin{DB: data}}

    ctx := context.Background()

    database.UpdateProduct(ctx, "0004")
}

Answered By – Rahmat Fathoni

Answer Checked By – Timothy Miller (GoLangFix Admin)

Leave a Reply

Your email address will not be published.