Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 86 additions & 21 deletions api/types/installers/agentless-installer.sh.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,84 @@ set -o errexit
set -o pipefail
set -o nounset

(
flock -n 9 || exit 1
if grep -q "Section created by 'teleport join openssh'" "$SSHD_CONFIG"; then
exit 0
run_teleport() {
TOKEN="$1"
PRINCIPALS="$2"
LABELS="$3"
ADDRESS="$4"

sudo /usr/local/bin/teleport join openssh \
--openssh-config="${SSHD_CONFIG}" \
--join-method=iam \
--token="$TOKEN" \
--proxy-server="{{ .PublicProxyAddr }}" \
--additional-principals="$PRINCIPALS" \
--labels="$LABELS" \
--address="$ADDRESS":22 \
--restart-sshd
}

get_metadata_item() {
IMDS_TOKEN="$1"
ENDPOINT="$2"

curl -m5 -sS -H "X-aws-ec2-metadata-token: ${IMDS_TOKEN}" "http://169.254.169.254/latest/meta-data/$ENDPOINT"
}

get_principals() {
IMDS_TOKEN="$1"

LOCAL_IP="$(get_metadata_item "$IMDS_TOKEN" local-ipv4)"
PUBLIC_IP="$(get_metadata_item "$IMDS_TOKEN" public-ipv4 || echo "")"

PRINCIPALS=""
if [ ! "$LOCAL_IP" = "" ]; then
PRINCIPALS="$LOCAL_IP,$PRINCIPALS"
fi
if [ ! "$PUBLIC_IP" = "" ]; then
PRINCIPALS="$PUBLIC_IP,$PRINCIPALS"
fi

echo "$PRINCIPALS"
}

get_address() {
IMDS_TOKEN="$1"

PUBLIC_IP=$(get_metadata_item "$IMDS_TOKEN" public-ipv4 || echo "")
if [ ! "$PUBLIC_IP" = "" ]; then
echo "$PUBLIC_IP"
return 0
fi

LOCAL_IP="$(get_metadata_item "$IMDS_TOKEN" local-ipv4)"
if [ ! "$LOCAL_IP" = "" ]; then
echo "$LOCAL_IP"
return 0
fi

echo "Failed to retreive an IP address to connect to, which is a required parameter"
return 1
}

get_labels() {
IMDS_TOKEN="$1"

INSTANCE_INFO=$(curl -m5 -sS -H "X-aws-ec2-metadata-token: ${IMDS_TOKEN}" http://169.254.169.254/latest/dynamic/instance-identity/document)

ACCOUNT_ID="$(echo "$INSTANCE_INFO" | jq -r .accountId)"
INSTANCE_ID="$(echo "$INSTANCE_INFO" | jq -r .instanceId)"
REGION="$(echo "$INSTANCE_INFO" | jq -r .region)"

LABELS="teleport.dev/instance-id=${INSTANCE_ID},teleport.dev/account-id=${ACCOUNT_ID},teleport.dev/aws-region=${REGION}"

echo "$LABELS"
}

install_teleport() {
. /etc/os-release

PACKAGE_LIST="{{ .TeleportPackage }}"
PACKAGE_LIST="jq {{ .TeleportPackage }}"
if [[ "{{ .AutomaticUpgrades }}" == "true" ]]; then
PACKAGE_LIST="${PACKAGE_LIST} {{ .TeleportPackage }}-updater"
fi
Expand Down Expand Up @@ -42,25 +111,21 @@ set -o nounset
echo "Unsupported distro: $ID"
exit 1
fi
}

IMDS_TOKEN=$(curl -m5 -sS -X PUT "http://169.254.169.254/latest/api/token" -H "X-aws-ec2-metadata-token-ttl-seconds: 300")
LOCAL_IP=$(curl -m5 -sS -H "X-aws-ec2-metadata-token: ${IMDS_TOKEN}" http://169.254.169.254/latest/meta-data/local-ipv4)
PUBLIC_IP=$(curl -m5 -sS -H "X-aws-ec2-metadata-token: ${IMDS_TOKEN}" http://169.254.169.254/latest/meta-data/public-ipv4 || echo "")
(
flock -n 9 || exit 1

PRINCIPALS=""
if [ ! "$LOCAL_IP" = "" ]; then
PRINCIPALS="$LOCAL_IP,$PRINCIPALS"
fi
if [ ! "$PUBLIC_IP" = "" ]; then
PRINCIPALS="$PUBLIC_IP,$PRINCIPALS"
TOKEN="$1"

if ! test -f /usr/local/bin/teleport; then
install_teleport
fi

sudo /usr/bin/teleport join openssh \
--openssh-config="${SSHD_CONFIG}" \
--join-method=iam \
--token="$1" \
--proxy-server="{{ .PublicProxyAddr }}" \
--additional-principals="$PRINCIPALS" \
--restart-sshd
IMDS_TOKEN=$(curl -m5 -sS -X PUT "http://169.254.169.254/latest/api/token" -H "X-aws-ec2-metadata-token-ttl-seconds: 300")
PRINCIPALS="$(get_principals "$IMDS_TOKEN")"
LABELS="$(get_labels "$IMDS_TOKEN")"
ADDRESS="$(get_address "$IMDS_TOKEN")"
run_teleport "$TOKEN" "$PRINCIPALS" "$LABELS" "$ADDRESS"

) 9>/var/lock/teleport_install.lock
1 change: 1 addition & 0 deletions lib/service/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func (process *TeleportProcess) initDiscoveryService() error {
Emitter: asyncEmitter,
AccessPoint: accessPoint,
Log: process.log,
ClusterName: conn.ClientIdentity.ClusterName,
})
if err != nil {
return trace.Wrap(err)
Expand Down
121 changes: 102 additions & 19 deletions lib/srv/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/compute/armcompute/v3"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/ssm"
"github.com/gravitational/trace"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -72,6 +71,8 @@ type Config struct {
// for all discovery services. If different agents are used to discover different
// sets of cloud resources, this field must be different for each set of agents.
DiscoveryGroup string
// ClusterName is the name of the Teleport cluster.
ClusterName string
}

func (c *Config) CheckAndSetDefaults() error {
Expand Down Expand Up @@ -124,6 +125,8 @@ type Server struct {
kubeFetchers []common.Fetcher
// databaseFetchers holds all database fetchers.
databaseFetchers []common.Fetcher
// caRotationCh receives nodes that need to have their CAs rotated.
caRotationCh chan []types.Server
}

// New initializes a discovery Server
Expand Down Expand Up @@ -169,10 +172,12 @@ func (s *Server) initAWSWatchers(matchers []services.AWSMatcher) error {
// start ec2 watchers
var err error
if len(ec2Matchers) > 0 {
s.ec2Watcher, err = server.NewEC2Watcher(s.ctx, ec2Matchers, s.Clients)
s.caRotationCh = make(chan []types.Server)
s.ec2Watcher, err = server.NewEC2Watcher(s.ctx, ec2Matchers, s.Clients, s.caRotationCh)
if err != nil {
return trace.Wrap(err)
}

s.ec2Installer = server.NewSSMInstaller(server.SSMInstallerConfig{
Emitter: s.Emitter,
})
Expand Down Expand Up @@ -330,13 +335,13 @@ func (s *Server) filterExistingEC2Nodes(instances *server.EC2Instances) {
return accountOK && instanceOK
})

var filtered []*ec2.Instance
var filtered []server.EC2Instance
outer:
for _, inst := range instances.Instances {
for _, node := range nodes {
match := types.MatchLabels(node, map[string]string{
types.AWSAccountIDLabel: instances.AccountID,
types.AWSInstanceIDLabel: aws.StringValue(inst.InstanceId),
types.AWSInstanceIDLabel: inst.InstanceID,
})
if match {
continue outer
Expand All @@ -347,9 +352,9 @@ outer:
instances.Instances = filtered
}

func genEC2InstancesLogStr(instances []*ec2.Instance) string {
return genInstancesLogStr(instances, func(i *ec2.Instance) string {
return aws.StringValue(i.InstanceId)
func genEC2InstancesLogStr(instances []server.EC2Instance) string {
return genInstancesLogStr(instances, func(i server.EC2Instance) string {
return i.InstanceID
})
}

Expand All @@ -376,15 +381,17 @@ func genInstancesLogStr[T any](instances []T, getID func(T) string) string {
}

func (s *Server) handleEC2Instances(instances *server.EC2Instances) error {
// TODO(amk): once agentless node inventory management is
// implemented, create nodes after a successful SSM run

// TODO(gavin): support assume_role_arn for ec2.
ec2Client, err := s.Clients.GetAWSSSMClient(s.ctx, instances.Region)
if err != nil {
return trace.Wrap(err)
}
s.filterExistingEC2Nodes(instances)
// instances.Rotation is true whenever the instances received need
// to be rotated, we don't want to filter out existing OpenSSH nodes as
// they all need to have the command run on them
if !instances.Rotation {
s.filterExistingEC2Nodes(instances)
}
if len(instances.Instances) == 0 {
return trace.NotFound("all fetched nodes already enrolled")
}
Expand All @@ -403,13 +410,96 @@ func (s *Server) handleEC2Instances(instances *server.EC2Instances) error {
return trace.Wrap(s.ec2Installer.Run(s.ctx, req))
}

func (s *Server) logHandleInstancesErr(err error) {
var aErr awserr.Error
if errors.As(err, &aErr) && aErr.Code() == ssm.ErrCodeInvalidInstanceId {
s.Log.WithError(err).Error("SSM SendCommand failed with ErrCodeInvalidInstanceId. Make sure that the instances have AmazonSSMManagedInstanceCore policy assigned. Also check that SSM agent is running and registered with the SSM endpoint on that instance and try restarting or reinstalling it in case of issues. See https://docs.aws.amazon.com/systems-manager/latest/APIReference/API_SendCommand.html#API_SendCommand_Errors for more details.")
} else if trace.IsNotFound(err) {
s.Log.Debug("All discovered EC2 instances are already part of the cluster.")
} else {
s.Log.WithError(err).Error("Failed to enroll discovered EC2 instances.")
}
}

func (s *Server) watchCARotation(ctx context.Context) {
ticker := time.NewTicker(time.Minute * 10)
defer ticker.Stop()
for {
select {
case <-ticker.C:
nodes, err := s.findUnrotatedEC2Nodes(ctx)
if err != nil {
if trace.IsNotFound(err) {
s.Log.Debug("No OpenSSH nodes require CA rotation")
continue
}
s.Log.Errorf("Error finding OpenSSH nodes requiring CA rotation: %s", err)
continue
}
s.Log.Debugf("Found %d nodes requiring rotation", len(nodes))
s.caRotationCh <- nodes
case <-s.ctx.Done():
return
}
}
}

func (s *Server) getMostRecentRotationForCAs(ctx context.Context, caTypes ...types.CertAuthType) (time.Time, error) {
var mostRecentUpdate time.Time
for _, caType := range caTypes {
ca, err := s.AccessPoint.GetCertAuthority(ctx, types.CertAuthID{
Type: caType,
DomainName: s.ClusterName,
}, false)
if err != nil {
return time.Time{}, trace.Wrap(err)
}
caRot := ca.GetRotation()
if caRot.State == types.RotationStateInProgress && caRot.Started.After(mostRecentUpdate) {
mostRecentUpdate = caRot.Started
}

if caRot.LastRotated.After(mostRecentUpdate) {
mostRecentUpdate = caRot.LastRotated
}
}
return mostRecentUpdate, nil
}

func (s *Server) findUnrotatedEC2Nodes(ctx context.Context) ([]types.Server, error) {
mostRecentCertRotation, err := s.getMostRecentRotationForCAs(ctx, types.OpenSSHCA, types.HostCA)
if err != nil {
return nil, trace.Wrap(err)
}
found := s.nodeWatcher.GetNodes(ctx, func(n services.Node) bool {
if n.GetSubKind() != types.SubKindOpenSSHNode {
return false
}
if _, ok := n.GetLabel(types.AWSAccountIDLabel); !ok {
return false
}
if _, ok := n.GetLabel(types.AWSInstanceIDLabel); !ok {
return false
}

return mostRecentCertRotation.After(n.GetRotation().LastRotated)
})

if len(found) == 0 {
return nil, trace.NotFound("no unrotated nodes found")
}
return found, nil
}

func (s *Server) handleEC2Discovery() {
if err := s.nodeWatcher.WaitInitialization(); err != nil {
s.Log.WithError(err).Error("Failed to initialize nodeWatcher.")
return
}

go s.ec2Watcher.Run()
go s.watchCARotation(s.ctx)

for {
select {
case instances := <-s.ec2Watcher.InstancesC:
Expand All @@ -418,14 +508,7 @@ func (s *Server) handleEC2Discovery() {
instances.AccountID, genEC2InstancesLogStr(ec2Instances.Instances))

if err := s.handleEC2Instances(ec2Instances); err != nil {
var aErr awserr.Error
if errors.As(err, &aErr) && aErr.Code() == ssm.ErrCodeInvalidInstanceId {
s.Log.WithError(err).Error("SSM SendCommand failed with ErrCodeInvalidInstanceId. Make sure that the instances have AmazonSSMManagedInstanceCore policy assigned. Also check that SSM agent is running and registered with the SSM endpoint on that instance and try restarting or reinstalling it in case of issues. See https://docs.aws.amazon.com/systems-manager/latest/APIReference/API_SendCommand.html#API_SendCommand_Errors for more details.")
} else if trace.IsNotFound(err) {
s.Log.Debug("All discovered EC2 instances are already part of the cluster.")
} else {
s.Log.WithError(err).Error("Failed to enroll discovered EC2 instances.")
}
s.logHandleInstancesErr(err)
}
case <-s.ctx.Done():
s.ec2Watcher.Stop()
Expand Down
6 changes: 5 additions & 1 deletion lib/srv/discovery/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ import (
"github.com/gravitational/teleport/lib/cloud/mocks"
libevents "github.com/gravitational/teleport/lib/events"
"github.com/gravitational/teleport/lib/services"
"github.com/gravitational/teleport/lib/srv/server"
)

type mockSSMClient struct {
Expand Down Expand Up @@ -303,7 +304,7 @@ func TestDiscoveryServer(t *testing.T) {
logHandler: func(t *testing.T, logs io.Reader, done chan struct{}) {
scanner := bufio.NewScanner(logs)
instances := genEC2Instances(58)
findAll := []string{genEC2InstancesLogStr(instances[:50]), genEC2InstancesLogStr(instances[50:])}
findAll := []string{genEC2InstancesLogStr(server.ToEC2Instances(instances[:50])), genEC2InstancesLogStr(server.ToEC2Instances(instances[50:]))}
index := 0
for scanner.Scan() {
if index == len(findAll) {
Expand Down Expand Up @@ -368,6 +369,9 @@ func TestDiscoveryServer(t *testing.T) {
Regions: []string{"eu-central-1"},
Tags: map[string]utils.Strings{"teleport": {"yes"}},
SSM: &services.AWSSSM{DocumentName: "document"},
Params: services.InstallerParams{
InstallTeleport: true,
},
}},
Emitter: tc.emitter,
Log: logger,
Expand Down
6 changes: 5 additions & 1 deletion lib/srv/server/azure_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,12 @@ func newAzureInstanceFetcher(cfg azureFetcherConfig) *azureInstanceFetcher {
}
}

func (*azureInstanceFetcher) GetMatchingInstances(_ []types.Server, _ bool) ([]Instances, error) {
return nil, trace.NotImplemented("not implemented for azure fetchers")
}

// GetInstances fetches all Azure virtual machines matching configured filters.
func (f *azureInstanceFetcher) GetInstances(ctx context.Context) ([]Instances, error) {
func (f *azureInstanceFetcher) GetInstances(ctx context.Context, _ bool) ([]Instances, error) {
instancesByRegion := make(map[string][]*armcompute.VirtualMachine)
for _, region := range f.Regions {
instancesByRegion[region] = []*armcompute.VirtualMachine{}
Expand Down
Loading