Skip to content

Commit

Permalink
handle describeInstances eventual consistency
Browse files Browse the repository at this point in the history
  • Loading branch information
vdhanan authored and Varun Dhananjaya committed Apr 19, 2021
1 parent a5882e3 commit 307ed14
Show file tree
Hide file tree
Showing 5 changed files with 295 additions and 33 deletions.
127 changes: 107 additions & 20 deletions pkg/cloud/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,16 @@ var (
volumeModificationDuration = 1 * time.Second
volumeModificationWaitFactor = 1.7
volumeModificationWaitSteps = 10

volumeAttachmentStatePollSteps = 13
)

const (
volumeAttachmentStatePollDelay = 1 * time.Second
volumeAttachmentStatePollFactor = 1.8

volumeDetachedState = "detached"
volumeAttachedState = "attached"
)

// AWS provisioning limits.
Expand Down Expand Up @@ -206,7 +216,7 @@ type Cloud interface {
AttachDisk(ctx context.Context, volumeID string, nodeID string) (devicePath string, err error)
DetachDisk(ctx context.Context, volumeID string, nodeID string) (err error)
ResizeDisk(ctx context.Context, volumeID string, reqSize int64) (newSize int64, err error)
WaitForAttachmentState(ctx context.Context, volumeID, state string) error
WaitForAttachmentState(ctx context.Context, volumeID, expectedState string, expectedInstance string, expectedDevice string, alreadyAssigned bool) (*ec2.VolumeAttachment, error)
GetDiskByName(ctx context.Context, name string, capacityBytes int64) (disk *Disk, err error)
GetDiskByID(ctx context.Context, volumeID string) (disk *Disk, err error)
IsExistInstance(ctx context.Context, nodeID string) (success bool)
Expand Down Expand Up @@ -401,15 +411,29 @@ func (c *cloud) AttachDisk(ctx context.Context, volumeID, nodeID string) (string

}

attachment, err := c.WaitForAttachmentState(ctx, volumeID, volumeAttachedState, *instance.InstanceId, device.Path, device.IsAlreadyAssigned)

// This is the only situation where we taint the device
if err := c.WaitForAttachmentState(ctx, volumeID, "attached"); err != nil {
if err != nil {
device.Taint()
return "", err
}

// TODO: Double check the attachment to be 100% sure we attached the correct volume at the correct mountpoint
// Double check the attachment to be 100% sure we attached the correct volume at the correct mountpoint
// It could happen otherwise that we see the volume attached from a previous/separate AttachVolume call,
// which could theoretically be against a different device (or even instance).
if attachment == nil {
// Impossible?
return "", fmt.Errorf("unexpected state: attachment nil after attached %q to %q", volumeID, nodeID)
}
if device.Path != aws.StringValue(attachment.Device) {
// Already checked in waitForAttachmentState(), but just to be sure...
return "", fmt.Errorf("disk attachment of %q to %q failed: requested device %q but found %q", volumeID, nodeID, device.Path, aws.StringValue(attachment.Device))
}
if *instance.InstanceId != aws.StringValue(attachment.InstanceId) {
return "", fmt.Errorf("disk attachment of %q to %q failed: requested instance %q but found %q", volumeID, nodeID, *instance.InstanceId, aws.StringValue(attachment.InstanceId))
}

// TODO: Check volume capability matches for ALREADY_EXISTS
// This could happen when request volume already attached to request node,
// but is incompatible with the specified volume_capability or readonly flag
Expand Down Expand Up @@ -448,25 +472,32 @@ func (c *cloud) DetachDisk(ctx context.Context, volumeID, nodeID string) error {
return fmt.Errorf("could not detach volume %q from node %q: %v", volumeID, nodeID, err)
}

if err := c.WaitForAttachmentState(ctx, volumeID, "detached"); err != nil {
attachment, err := c.WaitForAttachmentState(ctx, volumeID, volumeDetachedState, *instance.InstanceId, "", false)
if err != nil {
return err
}
if attachment != nil {
// We expect it to be nil, it is (maybe) interesting if it is not
klog.V(2).Infof("waitForAttachmentState returned non-nil attachment with state=detached: %v", attachment)
}

return nil
}

// WaitForAttachmentState polls until the attachment status is the expected value.
func (c *cloud) WaitForAttachmentState(ctx context.Context, volumeID, state string) error {
func (c *cloud) WaitForAttachmentState(ctx context.Context, volumeID, expectedState string, expectedInstance string, expectedDevice string, alreadyAssigned bool) (*ec2.VolumeAttachment, error) {
// Most attach/detach operations on AWS finish within 1-4 seconds.
// By using 1 second starting interval with a backoff of 1.8,
// we get [1, 1.8, 3.24, 5.832000000000001, 10.4976].
// In total we wait for 2601 seconds.
backoff := wait.Backoff{
Duration: 1 * time.Second,
Factor: 1.8,
Steps: 13,
Duration: volumeAttachmentStatePollDelay,
Factor: volumeAttachmentStatePollFactor,
Steps: volumeAttachmentStatePollSteps,
}

var attachment *ec2.VolumeAttachment

verifyVolumeFunc := func() (bool, error) {
request := &ec2.DescribeVolumesInput{
VolumeIds: []*string{
Expand All @@ -476,28 +507,84 @@ func (c *cloud) WaitForAttachmentState(ctx context.Context, volumeID, state stri

volume, err := c.getVolume(ctx, request)
if err != nil {
return false, err
}

if len(volume.Attachments) == 0 {
if state == "detached" {
return true, nil
// The VolumeNotFound error is special -- we don't need to wait for it to repeat
if isAWSErrorVolumeNotFound(err) {
if expectedState == volumeDetachedState {
// The disk doesn't exist, assume it's detached, log warning and stop waiting
klog.Warningf("Waiting for volume %q to be detached but the volume does not exist", volumeID)
return true, nil
}
if expectedState == volumeAttachedState {
// The disk doesn't exist, complain, give up waiting and report error
klog.Warningf("Waiting for volume %q to be attached but the volume does not exist", volumeID)
return false, err
}
}

klog.Warningf("Ignoring error from describe volume for volume %q; will retry: %q", volumeID, err)
return false, nil
}

if len(volume.Attachments) > 1 {
// Shouldn't happen; log so we know if it is
klog.Warningf("Found multiple attachments for volume %q: %v", volumeID, volume)
}
attachmentState := ""
for _, a := range volume.Attachments {
if a.State == nil {
if attachmentState != "" {
// Shouldn't happen; log so we know if it is
klog.Warningf("Found multiple attachments for volume %q: %v", volumeID, volume)
}
if a.State != nil {
attachment = a
attachmentState = *a.State
} else {
// Shouldn't happen; log so we know if it is
klog.Warningf("Ignoring nil attachment state for volume %q: %v", volumeID, a)
continue
}
if *a.State == state {
return true, nil
}
if attachmentState == "" {
attachmentState = volumeDetachedState
}
if attachment != nil {
// AWS eventual consistency can go back in time.
// For example, we're waiting for a volume to be attached as /dev/xvdba, but AWS can tell us it's
// attached as /dev/xvdbb, where it was attached before and it was already detached.
// Retry couple of times, hoping AWS starts reporting the right status.
device := aws.StringValue(attachment.Device)
if expectedDevice != "" && device != "" && device != expectedDevice {
klog.Warningf("Expected device %s %s for volume %s, but found device %s %s", expectedDevice, expectedState, volumeID, device, attachmentState)
return false, nil
}
instanceID := aws.StringValue(attachment.InstanceId)
if expectedInstance != "" && instanceID != "" && instanceID != expectedInstance {
klog.Warningf("Expected instance %s/%s for volume %s, but found instance %s/%s", expectedInstance, expectedState, volumeID, instanceID, attachmentState)
return false, nil
}
}

// if we expected volume to be attached and it was reported as already attached via DescribeInstance call
// but DescribeVolume told us volume is detached, we will short-circuit this long wait loop and return error
// so as AttachDisk can be retried without waiting for 20 minutes.
if (expectedState == volumeAttachedState) && alreadyAssigned && (attachmentState != expectedState) {
return false, fmt.Errorf("attachment of disk %q failed, expected device to be attached but was %s", volumeID, attachmentState)
}

// Attachment is in requested state, finish waiting
if attachmentState == expectedState {
// But first, reset attachment to nil if expectedState equals volumeDetachedState.
// Caller will not expect an attachment to be returned for a detached volume if we're not also returning an error.
if expectedState == volumeDetachedState {
attachment = nil
}
return true, nil
}
// continue waiting
klog.V(2).Infof("Waiting for volume %q state: actual=%s, desired=%s", volumeID, attachmentState, expectedState)
return false, nil
}

return wait.ExponentialBackoff(backoff, verifyVolumeFunc)
return attachment, wait.ExponentialBackoff(backoff, verifyVolumeFunc)
}

func (c *cloud) GetDiskByName(ctx context.Context, name string, capacityBytes int64) (*Disk, error) {
Expand Down Expand Up @@ -1066,7 +1153,7 @@ func volumeModificationDone(state string) bool {
func getVolumeAttachmentsList(volume *ec2.Volume) []string {
var volumeAttachmentList []string
for _, attachment := range volume.Attachments {
if attachment.State != nil && strings.ToLower(aws.StringValue(attachment.State)) == "attached" {
if attachment.State != nil && strings.ToLower(aws.StringValue(attachment.State)) == volumeAttachedState {
volumeAttachmentList = append(volumeAttachmentList, aws.StringValue(attachment.InstanceId))
}
}
Expand Down
Loading

0 comments on commit 307ed14

Please sign in to comment.