Skip to content

Commit cab2f4f

Browse files
committed
refactor(client): implement tx queue to handle single tx per block limitation
Due to Akash not yet supporting multiple transactions per block, implement: - Add transaction worker and queue system to serialize concurrent transactions - Track block heights via node status CLI command - Implement transaction confirmation waiting mechanism - Add helper methods for transaction status querying - Improve error handling for failed transactions - Introduce new types for transaction worker and node status
1 parent c5cb55f commit cab2f4f

File tree

8 files changed

+220
-26
lines changed

8 files changed

+220
-26
lines changed

akash/client/cli/cli.go

+33-6
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,10 @@ func envExists(key string) bool {
1313
}
1414

1515
type AkashCommand struct {
16-
ctx context.Context
17-
Content []string
16+
client AkashCliClient
17+
ctx context.Context
18+
Content []string
19+
blockHeight string
1820
}
1921

2022
type AkashCliClient interface {
@@ -29,6 +31,7 @@ func AkashCli(client AkashCliClient) AkashCommand {
2931
}
3032

3133
return AkashCommand{
34+
client: client,
3235
ctx: client.GetContext(),
3336
Content: []string{path},
3437
}
@@ -38,6 +41,18 @@ func (c AkashCommand) Tx() AkashCommand {
3841
return c.append("tx")
3942
}
4043

44+
func (c AkashCommand) Query() AkashCommand {
45+
return c.append("query")
46+
}
47+
48+
func (c AkashCommand) SetHash(hash string) AkashCommand {
49+
return c.append(hash)
50+
}
51+
52+
func (c AkashCommand) QueryTx() AkashCommand {
53+
return c.append("tx")
54+
}
55+
4156
func (c AkashCommand) Deployment() AkashCommand {
4257
return c.append("deployment")
4358
}
@@ -66,10 +81,6 @@ func (c AkashCommand) Close() AkashCommand {
6681
return c.append("close")
6782
}
6883

69-
func (c AkashCommand) Query() AkashCommand {
70-
return c.append("query")
71-
}
72-
7384
func (c AkashCommand) Market() AkashCommand {
7485
return c.append("market")
7586
}
@@ -78,6 +89,14 @@ func (c AkashCommand) Provider() AkashCommand {
7889
return c.append("provider")
7990
}
8091

92+
func (c AkashCommand) Node() AkashCommand {
93+
return c.append("node")
94+
}
95+
96+
func (c AkashCommand) Status() AkashCommand {
97+
return c.append("status")
98+
}
99+
81100
func (c AkashCommand) Bid() AkashCommand {
82101
return c.append("bid")
83102
}
@@ -206,3 +225,11 @@ func (c AkashCommand) append(str string) AkashCommand {
206225
c.Content = append(c.Content, str)
207226
return c
208227
}
228+
229+
func (c *AkashCommand) SetBlockHeight(height string) {
230+
c.blockHeight = height
231+
}
232+
233+
func (c AkashCommand) GetBlockHeight() string {
234+
return c.blockHeight
235+
}

akash/client/cli/cmd.go

+6-10
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import (
1010
"strings"
1111
)
1212

13-
func (c AkashCommand) AsCmd() *exec.Cmd {
13+
func (c *AkashCommand) AsCmd() *exec.Cmd {
1414
return exec.Command(
1515
c.Content[0],
1616
c.Headless()...,
@@ -21,19 +21,18 @@ type AkashErrorResponse struct {
2121
RawLog string `json:"raw_log"`
2222
}
2323

24-
func (c AkashCommand) Raw() ([]byte, error) {
24+
func (c *AkashCommand) Raw() ([]byte, error) {
2525
cmd := c.AsCmd()
26-
2726
tflog.Debug(c.ctx, strings.Join(cmd.Args, " "))
2827

2928
var errb bytes.Buffer
29+
3030
cmd.Stderr = &errb
31+
32+
// Normal handling for all other commands
3133
out, err := cmd.Output()
3234
if err != nil {
3335
tflog.Warn(c.ctx, fmt.Sprintf("Could not execute command: %s", err.Error()))
34-
if strings.Contains(errb.String(), "error unmarshalling") {
35-
return c.Raw()
36-
}
3736

3837
var akErr AkashErrorResponse
3938
err := json.Unmarshal(out, &akErr)
@@ -61,14 +60,11 @@ func (c AkashCommand) DecodeJson(v any) error {
6160
tflog.Debug(c.ctx, strings.Join(cmd.Args, " "))
6261

6362
var errb bytes.Buffer
63+
6464
cmd.Stderr = &errb
6565
out, err := cmd.Output()
6666
if err != nil {
6767
tflog.Warn(c.ctx, fmt.Sprintf("Could not execute command: %s", err.Error()))
68-
if strings.Contains(errb.String(), "error unmarshalling") {
69-
return c.DecodeJson(v)
70-
}
71-
7268
return errors.New(errb.String())
7369
}
7470

akash/client/client.go

+74-1
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,18 @@ package client
22

33
import (
44
"context"
5+
"errors"
6+
"fmt"
7+
"terraform-provider-akash/akash/client/cli"
8+
"terraform-provider-akash/akash/client/types"
9+
"time"
510
)
611

712
type AkashClient struct {
813
ctx context.Context
914
Config AkashProviderConfiguration
1015
transactionNote string
16+
txQueue chan types.TxRequest
1117
}
1218

1319
type AkashProviderConfiguration struct {
@@ -36,5 +42,72 @@ func (ak *AkashClient) SetGlobalTransactionNote(note string) {
3642
}
3743

3844
func New(ctx context.Context, configuration AkashProviderConfiguration) *AkashClient {
39-
return &AkashClient{ctx: ctx, Config: configuration}
45+
client := &AkashClient{
46+
ctx: ctx,
47+
Config: configuration,
48+
txQueue: make(chan types.TxRequest, 100),
49+
}
50+
51+
go client.txWorker()
52+
53+
return client
54+
}
55+
56+
func (ak *AkashClient) txWorker() {
57+
for req := range ak.txQueue {
58+
txId, err := req.Handler()
59+
if err != nil {
60+
req.Result <- types.TxResult{Err: err}
61+
}
62+
63+
err = ak.waitForTx(txId)
64+
req.Result <- types.TxResult{Err: err}
65+
close(req.Result)
66+
}
67+
}
68+
69+
func (ak *AkashClient) waitForTx(txHash string) error {
70+
timer := time.NewTimer(30 * time.Second)
71+
defer timer.Stop()
72+
73+
for {
74+
select {
75+
case <-timer.C:
76+
return errors.New("timeout waiting for transaction confirmation")
77+
default:
78+
cmd := cli.AkashCli(ak).Query().Tx().SetHash(txHash).
79+
SetNode(ak.Config.Node).
80+
OutputJson()
81+
82+
var txResponse types.Transaction
83+
if err := cmd.DecodeJson(&txResponse); err != nil {
84+
time.Sleep(500 * time.Millisecond)
85+
continue
86+
}
87+
88+
if txResponse.Failed() {
89+
return fmt.Errorf("transaction failed: %s", txResponse.RawLog)
90+
}
91+
92+
return nil
93+
}
94+
}
95+
}
96+
97+
func (ak *AkashClient) WaitForTransaction(handler types.TxHandler) error {
98+
resultChan := make(chan types.TxResult, 1)
99+
ak.txQueue <- types.TxRequest{
100+
Handler: handler,
101+
Result: resultChan,
102+
}
103+
104+
timer := time.NewTimer(30 * time.Second)
105+
defer timer.Stop()
106+
107+
select {
108+
case <-timer.C:
109+
return errors.New("timeout waiting for transaction confirmation")
110+
case result := <-resultChan:
111+
return result.Err
112+
}
40113
}

akash/client/deployment.go

+40-7
Original file line numberDiff line numberDiff line change
@@ -60,13 +60,33 @@ func transactionCreateDeployment(ak *AkashClient, manifestLocation string) (type
6060
SetNode(ak.Config.Node).
6161
SetNote(ak.transactionNote).OutputJson()
6262

63-
transaction := types.Transaction{}
64-
if err := cmd.DecodeJson(&transaction); err != nil {
63+
var transaction types.Transaction
64+
65+
err := ak.WaitForTransaction(func() (string, error) {
66+
if err := cmd.DecodeJson(&transaction); err != nil {
67+
return "", err
68+
}
69+
return transaction.TxHash, nil
70+
})
71+
if err != nil {
72+
return nil, err
73+
}
74+
75+
if transaction.Failed() {
76+
return nil, fmt.Errorf("transaction failed: %s", transaction.RawLog)
77+
}
78+
79+
// Query the transaction to get its events
80+
queryCmd := cli.AkashCli(ak).Query().QueryTx().SetHash(transaction.TxHash).
81+
SetNode(ak.Config.Node).
82+
OutputJson()
83+
84+
if err := queryCmd.DecodeJson(&transaction); err != nil {
6585
return nil, err
6686
}
6787

6888
if len(transaction.Logs) == 0 {
69-
return nil, errors.New(fmt.Sprintf("something went wrong: %s", transaction.RawLog))
89+
return nil, errors.New("no transaction logs found")
7090
}
7191

7292
return transaction.Logs[0].Events[0].Attributes, nil
@@ -80,12 +100,18 @@ func (ak *AkashClient) DeleteDeployment(dseq string, owner string) error {
80100
SetNode(ak.Config.Node).
81101
SetNote(ak.transactionNote).AutoAccept().OutputJson()
82102

83-
out, err := cmd.Raw()
103+
var transaction types.Transaction
104+
err := ak.WaitForTransaction(func() (string, error) {
105+
if err := cmd.DecodeJson(&transaction); err != nil {
106+
return "", err
107+
}
108+
return transaction.TxHash, nil
109+
})
84110
if err != nil {
85111
return err
86112
}
87113

88-
tflog.Debug(ak.ctx, fmt.Sprintf("Response: %s", out))
114+
tflog.Debug(ak.ctx, fmt.Sprintf("Response: %s", transaction.RawLog))
89115

90116
return nil
91117
}
@@ -99,12 +125,19 @@ func (ak *AkashClient) UpdateDeployment(dseq string, manifestLocation string) er
99125
SetNote(ak.transactionNote).
100126
GasAuto().SetGasAdjustment(1.5).SetGasPrices().SetSignMode("amino-json").AutoAccept().OutputJson()
101127

102-
out, err := cmd.Raw()
128+
var transaction types.Transaction
129+
err := ak.WaitForTransaction(func() (string, error) {
130+
if err := cmd.DecodeJson(&transaction); err != nil {
131+
return "", err
132+
}
133+
return transaction.TxHash, nil
134+
})
135+
103136
if err != nil {
104137
return err
105138
}
106139

107-
tflog.Debug(ak.ctx, fmt.Sprintf("Response: %s", out))
140+
tflog.Debug(ak.ctx, fmt.Sprintf("Response: %s", transaction.RawLog))
108141

109142
return nil
110143
}

akash/client/lease.go

+25-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
package client
22

33
import (
4+
"encoding/json"
5+
"strings"
46
"terraform-provider-akash/akash/client/cli"
7+
"terraform-provider-akash/akash/client/types"
58
)
69

710
func (ak *AkashClient) CreateLease(seqs Seqs, provider string) (string, error) {
@@ -11,8 +14,28 @@ func (ak *AkashClient) CreateLease(seqs Seqs, provider string) (string, error) {
1114
DefaultGas().SetChainId(ak.Config.ChainId).SetKeyringBackend(ak.Config.KeyringBackend).
1215
SetNote(ak.transactionNote).AutoAccept().SetNode(ak.Config.Node).OutputJson()
1316

14-
out, err := cmd.Raw()
15-
if err != nil {
17+
var out []byte
18+
var err error
19+
var transaction types.Transaction
20+
21+
if err = ak.WaitForTransaction(func() (string, error) {
22+
out, err = cmd.Raw()
23+
if err != nil {
24+
return "", err
25+
}
26+
27+
err = json.Unmarshal(out, &transaction)
28+
if err != nil {
29+
return "", err
30+
}
31+
32+
err = json.NewDecoder(strings.NewReader(string(out))).Decode(&transaction)
33+
if err != nil {
34+
return "", err
35+
}
36+
37+
return transaction.TxHash, nil
38+
}); err != nil {
1639
return "", err
1740
}
1841

akash/client/types/node_status.go

+9
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package types
2+
3+
type NodeStatus struct {
4+
SyncInfo struct {
5+
LatestBlockHeight string `json:"latest_block_height"`
6+
LatestBlockTime string `json:"latest_block_time"`
7+
CatchingUp bool `json:"catching_up"`
8+
} `json:"SyncInfo"`
9+
}

akash/client/types/transactions.go

+21
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import "errors"
44

55
type Transaction struct {
66
Height string `json:"height"`
7+
TxHash string `json:"txhash"`
78
Logs []TransactionLog `json:"logs"`
89
RawLog string `json:"raw_log"`
910
}
@@ -33,3 +34,23 @@ func (a TransactionEventAttributes) Get(key string) (string, error) {
3334

3435
return "", errors.New("attribute not found")
3536
}
37+
38+
func (t Transaction) Failed() bool {
39+
return len(t.Logs) == 0
40+
}
41+
42+
func (t Transaction) GetEventsByType(eventType string) []TransactionEvent {
43+
if len(t.Logs) == 0 {
44+
return nil
45+
}
46+
47+
var matches []TransactionEvent
48+
for _, log := range t.Logs {
49+
for _, event := range log.Events {
50+
if event.Type == eventType {
51+
matches = append(matches, event)
52+
}
53+
}
54+
}
55+
return matches
56+
}

akash/client/types/tx_worker.go

+12
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package types
2+
3+
type TxHandler func() (string, error)
4+
5+
type TxRequest struct {
6+
Handler TxHandler
7+
Result chan TxResult
8+
}
9+
10+
type TxResult struct {
11+
Err error
12+
}

0 commit comments

Comments
 (0)