I was setting up a greenfield project at work, and spoiled by Spring, I took transaction management for granted. When the need arose, I tried to search for a good implementation and didn’t find anything promising, at least something that had an easy way to set per transaction timeouts, which I have learnt is important.
An easy-to-use transaction manager in a large codebase paves the path to hard-to-untangle deadlocks in your database. I believe client side timeouts can help avoid these.
LLMs did give options, but they felt unpolished.
Considering that I have started exploring Go heavily, and Postgres is my go-to database choice, it felt right to create a pgx transaction manager, gollback, that I could re-use across my projects. One that has support for timeouts, read-only transactions, and most importantly, integration tests with a real database.
structuring transactions in code
Irrespective of the design pattern you use (DDD, 3 layer, hexagonal, onion, or whatever), the structure usually boils down to a service layer that contains business logic and a repository layer that talks to the database.
You want the repository layer to be reusable, by different services, which orchestrate your database changes.
Keeping this in mind, I don’t want the repositories to be aware of whether the queries are being run in a larger transaction or not. This thinking should reside in the service layer, and I have structured my package around this…
- A transaction manager that is injected into the service layer, handling
begin,commit, androllback. - A connection getter to be used by the repository layer.
why would i want transactions in my code?
Transactions help ensure that a set of queries is executed atomically. If a runtime error occurs, or a later insertion fails, or your optimistic lock check fails, you can roll back all the changes made in that transaction in one go.
Before jumping into how this package works, let’s set an example.
type WishlistRepo struct {
pool *pgxpool.Pool
}
func (r *WishlistRepo) Create(ctx context.Context, name string) (*Wishlist, error)
type WishlistItemsRepo struct {
pool *pgxpool.Pool
}
func (r *WishlistItemsRepo) AddItemToWishlist(ctx context.Context, wishlistID int, itemID int, lastPriceUSD float64) error
We have two tables wishlists and wishlist_items, each having a repository. wishlist_items is a many-to-many table, storing items and their last price at the time of adding them to the wishlist.
If we had to do this without transactions, it would look something like this:
type WishlistService struct {
wishlistRepo *WishlistRepo
wishlistItemsRepo *WishlistItemsRepo
itemPricingService *ItemPricingService
}
func (s *WishlistService) CreateWishlistWithItems(ctx context.Context, name string, items []Item) (*Wishlist, error) {
wishlist, err := s.wishlistRepo.Create(ctx, name)
if err != nil {
return nil, err
}
for _, item := range items {
itemPrice := s.itemPricingService.FetchPrice(ctx, item.ID)
err := s.wishlistItemsRepo.AddItemToWishlist(ctx, wishlist.ID, item.ID, itemPrice)
if err != nil {
return nil, err
}
}
return wishlist, nil
}
Any errors when fetching prices or adding items to the wishlist will cause the function to return with an error, leaving the database in an inconsistent state.
Let’s look at how we can modify this with gollback.
the transaction manager
I defined the interface for it as
type TxnManager interface {
RunInTxn(ctx context.Context, fn func(ctx context.Context) error, opts ...TxnOption) error
}
txnProvider implements this interface, and is injected into services.
Let’s look at a basic implementation of RunInTxn…
type txnKey struct{}
func (tp *txnProvider) RunInTxn(ctx context.Context, fn func(ctx context.Context) error, opts ...TxnOption) error {
// check if there is an existing transaction in the context
if _, ok := ctx.Value(txnKey{}).(pgx.Tx); ok {
slog.Warn("RunInTxn called inside existing transaction, reusing")
return fn(ctx)
}
// start a new transaction
txn, err := tp.pool.BeginTx(ctx, txOpts)
if err != nil {
return TxnBeginError{Err: err}
}
// add the transaction to the context
ctx = context.WithValue(ctx, txnKey{}, txn)
// run the function, and rollback if there is an error
if err := fn(ctx); err != nil {
if rbErr := txn.Rollback(ctx); rbErr != nil {
return TxnRollbackError{RollBackErr: rbErr, Cause: err}
}
return err
}
// else commit
if err := txn.Commit(ctx); err != nil {
return TxnCommitError{Err: err}
}
return nil
}
Transactions are started and stored in the context. The key used is a custom type to avoid collisions with other keys.
If the function returns an error, the transaction is rolled back. If it returns a nil, the transaction is committed.
I also chose to not support nested transactions, this is an implementation choice since nested transactions are rarely needed. If required, they can be implemented by starting new transactions or using Postgres SAVEPOINTs. I might revisit and add these in the future.
I added support for timeouts and read-only transactions, using functional options.
type TxnOptions struct {
Timeout time.Duration
ReadOnly bool
}
type TxnOption func(*TxnOptions)
func WithTimeout(d time.Duration) TxnOption {
return func(opts *TxnOptions) {
opts.Timeout = d
}
}
func ReadOnly() TxnOption {
return func(opts *TxnOptions) {
opts.ReadOnly = true
}
}
Read-only transactions are handled by passing the pgx.TxOptions{ReadOnly: true} option to BeginTx.
Timeouts are a little trickier; we set a timeout on the context passed to the function.
In typical Go fashion, fn is expected to handle the context cancellation.
func (tp *txnProvider) RunInTxn(ctx context.Context, fn func(ctx context.Context) error, opts ...TxnOption) error {
if _, ok := ctx.Value(txnKey{}).(pgx.Tx); ok {
slog.Warn("RunInTxn called inside existing transaction, reusing")
return fn(ctx)
}
// read options
options := &TxnOptions{}
for _, opt := range opts {
opt(options)
}
// set timeout on context
if options.Timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, options.Timeout)
defer cancel()
}
// set read-only
txOpts := pgx.TxOptions{}
if options.ReadOnly {
txOpts.AccessMode = pgx.ReadOnly
}
// pass txOpts
txn, err := tp.pool.BeginTx(ctx, txOpts)
if err != nil {
if ctx.Err() == context.DeadlineExceeded {
slog.Warn("transaction begin failed due to time out")
}
return TxnBeginError{Err: err}
}
ctx = context.WithValue(ctx, txnKey{}, txn)
if err := fn(ctx); err != nil {
// if the function returns a DeadlineExceeded error, roll the transaction back
if ctx.Err() == context.DeadlineExceeded {
slog.Warn("transaction closing due to timeout")
}
// rollback with the background context
rbCtx, rbCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer rbCancel()
if rbErr := txn.Rollback(rbCtx); rbErr != nil {
return TxnRollbackError{RollBackErr: rbErr, Cause: err}
}
return err
}
if err := txn.Commit(ctx); err != nil {
// if we attempt a commit after the context has expired, we log a warning
if ctx.Err() == context.DeadlineExceeded {
slog.Warn("transaction commit failed due to time out")
}
return TxnCommitError{Err: err}
}
return nil
}
If you look closely, the transaction isn’t closed until the function returns (an error or a nil) and the transaction remains open until then.
Forcing the transaction to rollback on timeout can lead to a race condition if fn is running a query at timeout.
the connection getter
Let’s now look at how the repository layers can use the transactions we pass in context.
We essentially want the repository to be agnostic to the kind of pgx connection (Conn, Pool, Tx) that it’s using.
We can enable this by creating an interface Conn.
type Conn interface {
Exec(ctx context.Context, sql string, arguments ...any) (pgconn.CommandTag, error)
Query(ctx context.Context, sql string, arguments ...any) (pgx.Rows, error)
QueryRow(ctx context.Context, sql string, arguments ...any) pgx.Row
}
And we define a typed function ConnGetter that returns a Tx if the context has one, and a Pool otherwise.
type ConnGetter func(ctx context.Context) Conn
I decided to use a typed function instead of a method because the response of this function depends solely on its input (the context).
Wrapping it all up, we create a constructor for txnProvider.
type txnProvider struct {
pool *pgxpool.Pool
}
func NewTxnProvider(pool *pgxpool.Pool) (TxnManager, ConnGetter) {
tp := &txnProvider{pool: pool}
return tp, func(ctx context.Context) Conn {
if txn, ok := ctx.Value(txnKey{}).(pgx.Tx); ok {
return txn
}
return pool
}
}
NewTxnProvider returns a TxnManager (which can be injected into services) and a ConnGetter (which can be used by repositories).
coming back to the example
Let’s make simple changes to the repositories and service to use the txnProvider.
type WishlistRepo struct {
connGetter ConnGetter
}
func (r *WishlistRepo) Create(ctx context.Context, name string) (*Wishlist, error) {
conn := r.connGetter(ctx)
// ... use conn.QueryRow()
}
type WishlistItemsRepo struct {
connGetter ConnGetter
}
type WishlistService struct {
wishlistRepo *WishlistRepo
wishlistItemsRepo *WishlistItemsRepo
itemPricingService *ItemPricingService
txnManager TxnManager
}
CreateWishlistWithItems can now be written as…
func (s *WishlistService) CreateWishlistWithItems(ctx context.Context, name string, items []Item) (*Wishlist, error) {
var wishlist *Wishlist
err := s.txnManager.RunInTxn(ctx, func(ctx context.Context) error {
wishlist, err := s.wishlistRepo.Create(ctx, name)
if err != nil {
return err
}
for _, item := range items {
itemPrice := s.itemPricingService.FetchPrice(ctx, item.ID)
err := s.wishlistItemsRepo.AddItemToWishlist(ctx, wishlist.ID, item.ID, itemPrice)
if err != nil {
return err
}
}
return nil
}, WithTimeout(30*time.Second))
if err != nil {
return nil, fmt.Errorf("failed to create wishlist: %w", err)
}
return wishlist, nil
}
Now any errors that happen inside the transaction will cause a rollback, making sure the database is not left in an inconsistent state. Since we have set a 30s timeout, we can be sure that if our price fetching service slows down, the transaction will be rolled back and stopped.
If you’re interested in the code, or how different edge cases are handled, you can check out the code on github. The integration tests cover all edge cases I could think of and are straightforward to run.
I will likely be adding more features to it as I use it in my projects, but feel free to open issues or PRs if you have any suggestions. Some I have in mind are:
- Per transaction isolation levels
- Nested transactions with SAVEPOINTs
- Support for more database packages,
database/sqlmost importantly.