demo: k8s pod exec over ssh

This commit is contained in:
Joe Kralicky 2025-03-23 20:47:25 +00:00
parent 17fedbfa6f
commit 8fee7e9930
No known key found for this signature in database
GPG key ID: 75C4875F34A9FB79
3 changed files with 207 additions and 19 deletions

View file

@ -52,6 +52,11 @@ import (
"google.golang.org/protobuf/types/known/structpb"
"google.golang.org/protobuf/types/known/timestamppb"
"google.golang.org/protobuf/types/known/wrapperspb"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/remotecommand"
)
type ActiveStreams struct {
@ -732,6 +737,21 @@ func marshalAny(msg proto.Message) *anypb.Any {
// should not automatically disconnect
var ErrHandoff = errors.New("handoff")
const loginScript = `` +
`user="%[1]s"; ` +
`if command -v getent >/dev/null 2>&1; then user_shell=$(getent passwd "$user" | cut -d: -f7); else user_shell="/bin/sh"; fi; ` +
`shell_basename=$(basename "$user_shell"); ` +
`if [ -z "$user_shell" ] || [ "$shell_basename" = "false" ] || [ "$shell_basename" = "nologin" ]; then ` +
` if [ -x /bin/bash ]; then ` +
` user_shell="/bin/bash"; ` +
` elif [ -x /bin/ash ]; then ` +
` user_shell="/bin/ash"; ` +
` else ` +
` user_shell="/bin/sh"; ` +
` fi; ` +
`fi; ` +
`exec /bin/su -s "$user_shell" "$user"`
func (a *Authorize) ServeChannel(
server extensions_ssh.StreamManagement_ServeChannelServer,
) error {
@ -740,6 +760,7 @@ func (a *Authorize) ServeChannel(
outputR, outputW := io.Pipe()
var peerId uint32
var activeProgram atomic.Pointer[tea.Program]
var activeSizeQueue atomic.Pointer[terminalSizeQueue]
errC := make(chan error, 1)
remoteWindow := &window{Cond: sync.NewCond(&sync.Mutex{})}
@ -961,6 +982,89 @@ func (a *Authorize) ServeChannel(
return err
}
if strings.Contains(state.Username, "://") {
u, err := url.Parse(state.Username)
if err == nil {
switch u.Scheme {
case "k8s":
user := u.User.Username()
_ = user
pod, namespace, _ := strings.Cut(u.Hostname(), ".")
// rules := clientcmd.NewDefaultClientConfigLoadingRules()
// apiConfig, err := rules.Load()
// if err != nil {
// return err
// }
// conf, err := clientcmd.NewDefaultClientConfig(
// *apiConfig, &clientcmd.ConfigOverrides{}).ClientConfig()
conf, err := rest.InClusterConfig()
if err != nil {
return fmt.Errorf("no in-cluster config available")
}
client, err := kubernetes.NewForConfig(conf)
if err != nil {
return fmt.Errorf("failed to create kubernetes client: %w", err)
}
container := ""
if u.Path != "" {
container = strings.Trim(u.Path, "/")
if strings.Contains(container, "/") {
return fmt.Errorf("invalid container name %q", container)
}
}
req := client.CoreV1().RESTClient().
Get().
Resource("pods").
Namespace(namespace).
Name(pod).
SubResource("exec").
VersionedParams(&corev1.PodExecOptions{
Container: container,
Command: []string{"sh", "-c", fmt.Sprintf(loginScript, user)},
Stdin: true,
Stdout: true,
Stderr: true,
TTY: true,
}, scheme.ParameterCodec)
executor, err := remotecommand.NewWebSocketExecutor(conf, "GET", req.URL().String())
if err != nil {
return fmt.Errorf("failed to create executor: %w", err)
}
go streamOutputToChannel(sendC, peerId, outputR)
go func() {
defer outputW.Close()
defer inputR.Close()
sizeC := make(chan *remotecommand.TerminalSize, 64)
sizeC <- &remotecommand.TerminalSize{
Width: uint16(downstreamPtyInfo.WidthColumns),
Height: uint16(downstreamPtyInfo.HeightRows),
}
defer close(sizeC)
queue := &terminalSizeQueue{C: sizeC}
activeSizeQueue.Store(queue)
defer activeSizeQueue.CompareAndSwap(queue, nil)
err = executor.StreamWithContext(ctx, remotecommand.StreamOptions{
Stdin: inputR,
Stdout: outputW,
Tty: true,
TerminalSizeQueue: queue,
})
if err != nil {
errC <- err
} else {
sendC <- &extensions_ssh.ChannelControl{
Protocol: "ssh",
ControlAction: marshalAny(&extensions_ssh.SSHChannelControlAction_Disconnect{
ReasonCode: 11,
}),
}
}
}()
continue
}
}
}
cmd := a.NewSSHCLI(a.currentConfig.Load(), downstreamPtyInfo, downstreamChannelInfo, state, inputR, outputW, sendC, &activeProgram)
if msg.Request == "shell" {
cmd.SetArgs([]string{"portal"})
@ -1008,14 +1112,17 @@ func (a *Authorize) ServeChannel(
Width: int(req.WidthColumns),
Height: int(req.HeightRows),
})
} else if q := activeSizeQueue.Load(); q != nil {
q.C <- &remotecommand.TerminalSize{
Width: uint16(req.WidthColumns),
Height: uint16(req.HeightRows),
}
}
}
case msgChannelData:
var msg channelDataMsg
gossh.Unmarshal(rawMsg, &msg)
if activeProgram.Load() != nil {
inputW.Write(msg.Rest)
}
inputW.Write(msg.Rest)
case msgChannelClose:
var msg channelDataMsg
gossh.Unmarshal(rawMsg, &msg)
@ -1038,6 +1145,14 @@ func (a *Authorize) ServeChannel(
}
}
type terminalSizeQueue struct {
C chan *remotecommand.TerminalSize
}
func (t *terminalSizeQueue) Next() *remotecommand.TerminalSize {
return (<-t.C)
}
type ptyReq struct {
TermEnv string
Width, Height uint32
@ -1091,10 +1206,8 @@ func (a *Authorize) NewSSHCLI(
_, cmdIsInteractive := cmd.Annotations["interactive"]
switch {
case (ptyInfo == nil) && cmdIsInteractive:
cmd.SilenceUsage = true
return fmt.Errorf("\x1b[31m'%s' is an interactive command and requires a TTY (try passing '-t' to ssh)\x1b[0m", cmd.Use)
case (ptyInfo != nil) && !cmdIsInteractive:
cmd.SilenceUsage = true
return fmt.Errorf("\x1b[31m'%s' is not an interactive command (try passing '-T' to ssh, or removing '-t')\x1b[0m\r", cmd.Use)
}
return nil
@ -1105,6 +1218,7 @@ func (a *Authorize) NewSSHCLI(
cmd.AddCommand(a.NewLogoutCommand(cfg, sessionID))
cmd.AddCommand(a.NewWhoamiCommand(cfg, sessionID))
cmd.CompletionOptions.DisableDefaultCmd = true
cmd.SilenceUsage = true
cmd.SetIn(stdin)
cmd.SetOut(stdout)
cmd.SetErr(stdout)