17
17
package loxinet
18
18
19
19
import (
20
+ "errors"
21
+ "fmt"
20
22
cmn "github.com/loxilb-io/loxilb/common"
21
23
opts "github.com/loxilb-io/loxilb/options"
24
+ bfd "github.com/loxilb-io/loxilb/proto"
22
25
tk "github.com/loxilb-io/loxilib"
23
-
24
- "bufio"
25
- "errors"
26
- "fmt"
27
26
"net"
28
- "os"
29
- "os/exec"
30
27
"time"
31
28
)
32
29
@@ -60,13 +57,25 @@ type ClusterNode struct {
60
57
// CIStateH - Cluster context handler
61
58
type CIStateH struct {
62
59
SpawnKa bool
63
- kaMode bool
60
+ RemoteIP net. IP
64
61
ClusterMap map [string ]* ClusterInstance
65
62
StateMap map [string ]int
66
63
NodeMap map [string ]* ClusterNode
67
64
}
68
65
69
- func kaSpawn () {
66
+ func (ci * CIStateH ) BFDSessionNotify (instance string , remote string , ciState string ) {
67
+ var sm cmn.HASMod
68
+
69
+ sm .Instance = instance
70
+ sm .State = ciState
71
+ sm .Vip = net .ParseIP ("0.0.0.0" )
72
+ tk .LogIt (tk .LogInfo , "ci-change instance %s - state %s vip %v\n " , instance , ciState , sm .Vip )
73
+ mh .mtx .Lock ()
74
+ defer mh .mtx .Unlock ()
75
+ ci .CIStateUpdate (sm )
76
+ }
77
+
78
+ func (ci * CIStateH ) startBFDProto () {
70
79
url := fmt .Sprintf ("http://127.0.0.1:%d/config/params" , opts .Opts .Port )
71
80
for {
72
81
if IsLoxiAPIActive (url ) {
@@ -76,107 +85,31 @@ func kaSpawn() {
76
85
time .Sleep (1 * time .Second )
77
86
}
78
87
79
- RunCommand ("rm -f /etc/shared/keepalive.state" , false )
80
- RunCommand ("pkill keepalived" , false )
81
88
mh .dp .WaitXsyncReady ("ka" )
82
89
// We need some cool-off period for loxilb to self sync-up in the cluster
83
90
time .Sleep (KAInitTiVal * time .Second )
84
91
85
- for {
86
- if exists := FileExists (KAConfigFile ); ! exists {
87
- time .Sleep (2000 * time .Millisecond )
88
- continue
89
- }
90
-
91
- pid := ReadPIDFile (KAPidFile1 )
92
- if pid != 0 {
93
- time .Sleep (5000 * time .Millisecond )
94
- continue
95
- }
96
-
97
- tk .LogIt (tk .LogInfo , "KA spawning\n " )
98
- cmd := exec .Command ("/usr/sbin/keepalived" , "-f" , KAConfigFile , "-n" )
99
- err := cmd .Run ()
100
- if err != nil {
101
- tk .LogIt (tk .LogError , "Error in running KA:%s\n " , err )
102
- } else {
103
- tk .LogIt (tk .LogInfo , "KA found dead. Reaping\n " )
104
- }
105
-
106
- rmf := fmt .Sprintf ("rm -f %s" , KAPidFile1 )
107
- RunCommand (rmf , false )
108
- rmf = fmt .Sprintf ("rm -f %s" , KAPidFile2 )
109
- RunCommand (rmf , false )
110
-
111
- time .Sleep (2000 * time .Millisecond )
112
- }
113
- }
114
-
115
- func (h * CIStateH ) CISync () {
116
- var sm cmn.HASMod
117
- var ciState int
118
- var ok bool
119
- clusterStateFile := "/etc/shared/keepalive.state"
120
- rf , err := os .Open (clusterStateFile )
121
- if err == nil {
122
-
123
- fsc := bufio .NewScanner (rf )
124
- fsc .Split (bufio .ScanLines )
125
-
126
- for fsc .Scan () {
127
- var inst string
128
- var state string
129
- var vip string
130
- // Format style -
131
- // INSTANCE default is in BACKUP state
132
- _ , err = fmt .Sscanf (fsc .Text (), "INSTANCE %s is in %s state vip %s" , & inst , & state , & vip )
133
- if err != nil {
134
- continue
135
- }
136
-
137
- if ciState , ok = h .StateMap [state ]; ! ok {
138
- continue
139
- }
140
-
141
- notify := false
142
-
143
- if eci , ok := h .ClusterMap [inst ]; ! ok {
144
- notify = true
145
- } else {
146
- if eci .State != ciState {
147
- notify = true
148
- }
149
- }
150
-
151
- if notify {
152
- sm .Instance = inst
153
- sm .State = state
154
- sm .Vip = net .ParseIP (vip )
155
- tk .LogIt (tk .LogInfo , "ci-change instance %s - state %s vip %v\n " , inst , state , sm .Vip )
156
- h .CIStateUpdate (sm )
157
- }
158
- }
159
-
160
- rf .Close ()
92
+ bs := bfd .StructNew (3784 )
93
+ err := bs .BFDAddRemote (ci .RemoteIP .String (), 3784 , bfd .BFDMinSysTXIntervalUs , 3 , "Default" , ci )
94
+ if err != nil {
95
+ tk .LogIt (tk .LogCritical , "KA - Cant add BFD remote\n " )
161
96
}
162
97
}
163
98
164
99
// CITicker - Periodic ticker for Cluster module
165
100
func (h * CIStateH ) CITicker () {
166
- mh .mtx .Lock ()
167
- h .CISync ()
168
- mh .mtx .Unlock ()
101
+ // Nothing to do currently
169
102
}
170
103
171
104
// CISpawn - Spawn CI application
172
- func (h * CIStateH ) CISpawn () {
173
- if h .SpawnKa {
174
- go kaSpawn ()
105
+ func (ci * CIStateH ) CISpawn () {
106
+ if ci .SpawnKa {
107
+ go ci . startBFDProto ()
175
108
}
176
109
}
177
110
178
111
// CIInit - routine to initialize Cluster context
179
- func CIInit (spawnKa bool , kaMode bool ) * CIStateH {
112
+ func CIInit (spawnKa bool , remoteIP net. IP ) * CIStateH {
180
113
var nCIh = new (CIStateH )
181
114
nCIh .StateMap = make (map [string ]int )
182
115
nCIh .StateMap ["MASTER" ] = cmn .CIStateMaster
@@ -185,7 +118,7 @@ func CIInit(spawnKa bool, kaMode bool) *CIStateH {
185
118
nCIh .StateMap ["STOP" ] = cmn .CIStateNotDefined
186
119
nCIh .StateMap ["NOT_DEFINED" ] = cmn .CIStateNotDefined
187
120
nCIh .SpawnKa = spawnKa
188
- nCIh .kaMode = kaMode
121
+ nCIh .RemoteIP = remoteIP
189
122
nCIh .ClusterMap = make (map [string ]* ClusterInstance )
190
123
191
124
if _ , ok := nCIh .ClusterMap [cmn .CIDefault ]; ! ok {
@@ -237,9 +170,9 @@ func (h *CIStateH) CIVipGet(inst string) (net.IP, error) {
237
170
return net .IPv4zero , errors .New ("not found" )
238
171
}
239
172
240
- // IsCIKAMode - routine to get HA state
173
+ // IsCIKAMode - routine to get KA mode
241
174
func (h * CIStateH ) IsCIKAMode () bool {
242
- return h . kaMode
175
+ return false
243
176
}
244
177
245
178
// CIStateUpdate - routine to update cluster state
@@ -274,6 +207,7 @@ func (h *CIStateH) CIStateUpdate(cm cmn.HASMod) (int, error) {
274
207
if mh .bgp != nil {
275
208
mh .bgp .UpdateCIState (cm .Instance , ci .State , ci .Vip )
276
209
}
210
+ mh .zr .Rules .RuleVIPSyncToClusterState ()
277
211
return ci .State , nil
278
212
}
279
213
0 commit comments