1 Star 0 Fork 43

liuxu/kubernetes

forked from src-openEuler/kubernetes 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
0002-kubelet-support-exec-and-attach-websocket-protocol.patch 10.09 KB
一键复制 编辑 原始数据 按行查看 历史
liuxu 提交于 2024-04-26 16:27 . adapt old patch
From 5ad72e2b135afb5fbe5112f901bdec79f8943611 Mon Sep 17 00:00:00 2001
From: liuxu <liuxu156@huawei.com>
Date: Fri, 26 Apr 2024 10:49:14 +0800
Subject: [PATCH] kubelet support exec and attach websocket protocol
Signed-off-by: liuxu <liuxu156@huawei.com>
---
pkg/kubelet/server/server.go | 43 +++-
.../pkg/cri/streaming/remotecommand/proxy.go | 212 ++++++++++++++++++
2 files changed, 247 insertions(+), 8 deletions(-)
create mode 100644 staging/src/k8s.io/kubelet/pkg/cri/streaming/remotecommand/proxy.go
diff --git a/pkg/kubelet/server/server.go b/pkg/kubelet/server/server.go
index 87a017f9..887e663a 100644
--- a/pkg/kubelet/server/server.go
+++ b/pkg/kubelet/server/server.go
@@ -834,51 +834,78 @@ func proxyStream(w http.ResponseWriter, r *http.Request, url *url.URL) {
// getAttach handles requests to attach to a container.
func (s *Server) getAttach(request *restful.Request, response *restful.Response) {
- params := getExecRequestParams(request)
streamOpts, err := remotecommandserver.NewOptions(request.Request)
if err != nil {
utilruntime.HandleError(err)
response.WriteError(http.StatusBadRequest, err)
return
}
+
+ url, err := s.getAttachUrl(request, response, streamOpts)
+ if err != nil {
+ klog.Errorf("failed to get backend url %v", err)
+ return
+ }
+ if url.Scheme == "ws" || url.Scheme == "wss" {
+ remotecommandserver.ProxyToWebSocket(response.ResponseWriter, request.Request, url, streamOpts)
+ } else {
+ proxyStream(response.ResponseWriter, request.Request, url)
+ }
+}
+
+func (s *Server) getAttachUrl(request *restful.Request, response *restful.Response, streamOpts *remotecommandserver.Options) (*url.URL, error) {
+ params := getExecRequestParams(request)
pod, ok := s.host.GetPodByName(params.podNamespace, params.podName)
if !ok {
response.WriteError(http.StatusNotFound, fmt.Errorf("pod does not exist"))
- return
+ return nil, fmt.Errorf("pod not found")
}
podFullName := kubecontainer.GetPodFullName(pod)
url, err := s.host.GetAttach(request.Request.Context(), podFullName, params.podUID, params.containerName, *streamOpts)
if err != nil {
streaming.WriteError(err, response.ResponseWriter)
- return
+ return nil, err
}
- proxyStream(response.ResponseWriter, request.Request, url)
+ return url, nil
}
// getExec handles requests to run a command inside a container.
func (s *Server) getExec(request *restful.Request, response *restful.Response) {
- params := getExecRequestParams(request)
streamOpts, err := remotecommandserver.NewOptions(request.Request)
if err != nil {
utilruntime.HandleError(err)
response.WriteError(http.StatusBadRequest, err)
return
}
+ url, err := s.getExecUrl(request, response, streamOpts)
+ if err != nil {
+ klog.Errorf("failed to get backend url %v", err)
+ return
+ }
+ if url.Scheme == "ws" || url.Scheme == "wss" {
+ remotecommandserver.ProxyToWebSocket(response.ResponseWriter, request.Request, url, streamOpts)
+ } else {
+ proxyStream(response.ResponseWriter, request.Request, url)
+ }
+}
+
+func (s *Server) getExecUrl(request *restful.Request, response *restful.Response, streamOpts *remotecommandserver.Options) (*url.URL, error) {
+ params := getExecRequestParams(request)
pod, ok := s.host.GetPodByName(params.podNamespace, params.podName)
if !ok {
response.WriteError(http.StatusNotFound, fmt.Errorf("pod does not exist"))
- return
+ return nil, fmt.Errorf("pod not found")
}
podFullName := kubecontainer.GetPodFullName(pod)
url, err := s.host.GetExec(request.Request.Context(), podFullName, params.podUID, params.containerName, params.cmd, *streamOpts)
if err != nil {
streaming.WriteError(err, response.ResponseWriter)
- return
+ return nil, err
}
- proxyStream(response.ResponseWriter, request.Request, url)
+ return url, nil
}
// getRun handles requests to run a command inside a container.
diff --git a/staging/src/k8s.io/kubelet/pkg/cri/streaming/remotecommand/proxy.go b/staging/src/k8s.io/kubelet/pkg/cri/streaming/remotecommand/proxy.go
new file mode 100644
index 00000000..179d8183
--- /dev/null
+++ b/staging/src/k8s.io/kubelet/pkg/cri/streaming/remotecommand/proxy.go
@@ -0,0 +1,212 @@
+package remotecommand
+
+import (
+ "bytes"
+ "errors"
+ "fmt"
+ "io"
+ "net/http"
+ "net/url"
+ "strings"
+ "time"
+
+ "github.com/gorilla/websocket"
+ apierrors "k8s.io/apimachinery/pkg/api/errors"
+ "k8s.io/apimachinery/pkg/apis/meta/v1"
+ remotecommandconsts "k8s.io/apimachinery/pkg/util/remotecommand"
+ "k8s.io/apimachinery/pkg/util/runtime"
+ "k8s.io/client-go/util/exec"
+ "k8s.io/klog/v2"
+)
+
+var (
+ streamIdleTimeout = 4 * time.Hour
+ streamCreationTimeout = remotecommandconsts.DefaultStreamCreationTimeout
+)
+
+// proxyStreamToWebSocket proxies stream to url with websocket.
+func ProxyToWebSocket(w http.ResponseWriter, r *http.Request, url *url.URL, opts *Options) {
+ klog.V(8).Infof("start proxy request to websocket %+v", r)
+ ctx, ok := createStreams(
+ r,
+ w,
+ opts,
+ remotecommandconsts.SupportedStreamingProtocols,
+ streamIdleTimeout,
+ streamCreationTimeout)
+ if !ok {
+ msg := "failed to create stream to fontend"
+ klog.Error(msg)
+ http.Error(w, msg, http.StatusInternalServerError)
+ return
+ }
+ defer func() {
+ if err := ctx.conn.Close(); err != nil {
+ klog.Errorf("failed to close connection, %v", err)
+ }
+ }()
+
+ klog.V(8).Infof("start connecting to websocket %s", url.String())
+ backendConn, err := connectBackend(url.String(), "channel.k8s.io", r)
+ if err != nil {
+ msg := fmt.Sprintf("connectBackend failed: %v", err)
+ klog.Error(msg)
+ http.Error(w, msg, http.StatusInternalServerError)
+ return
+ }
+ defer backendConn.Close()
+
+ var errConnection error
+ frontendStdinToBackendComplete := make(chan struct{})
+ frontendResizeToBackendComplete := make(chan struct{})
+ backendToFrontendComplete := make(chan struct{})
+
+ go func() {
+ for {
+ _, msg, err := backendConn.ReadMessage()
+ if err != nil {
+ e, ok := err.(*websocket.CloseError)
+ if !ok || e.Code != websocket.CloseNormalClosure {
+ errConnection = err
+ }
+ break
+ }
+
+ if len(msg) < 1 {
+ errConnection = fmt.Errorf("received err msg from backEnd (the length less than 1), msg: %s", string(msg))
+ break
+ }
+
+ switch msg[0] {
+ case stdoutChannel:
+ _, err = ctx.stdoutStream.Write(msg[1:])
+ case stderrChannel:
+ _, err = ctx.stderrStream.Write(msg[1:])
+ case errorChannel:
+ err = ctx.writeStatus(apierrors.NewInternalError(errors.New(string(msg[1:]))))
+ default:
+ err = fmt.Errorf("received invalid msg from backEnd, msg: %s", string(msg))
+ }
+
+ if err != nil {
+ errConnection = err
+ break
+ }
+ }
+ close(backendToFrontendComplete)
+ }()
+
+ if opts.Stdin {
+ go func() {
+ r := &rwc{
+ c: backendConn,
+ index: stdinChannel,
+ }
+ _, err := io.Copy(r, ctx.stdinStream)
+ if err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
+ errConnection = fmt.Errorf("copy data from frontend(stdinStream) to backend failed, err: %v", err)
+ }
+ close(frontendStdinToBackendComplete)
+ }()
+ }
+
+ if opts.TTY {
+ go func() {
+ r := &rwc{
+ c: backendConn,
+ index: resizeChannel,
+ }
+ _, err := io.Copy(r, ctx.resizeStream)
+ if err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
+ errConnection = fmt.Errorf("copy data from frontend(resizeStream) to backend failed, err: %v", err)
+ }
+ close(frontendResizeToBackendComplete)
+ }()
+ }
+ select {
+ case <-backendToFrontendComplete:
+ case <-frontendStdinToBackendComplete:
+ case <-frontendResizeToBackendComplete:
+ }
+
+ select {
+ case <-backendToFrontendComplete:
+ case <-time.Tick(30 * time.Second):
+ klog.Errorf("Wait backend to frontend complete timeout")
+ }
+
+ if errConnection != nil {
+ klog.Errorf("SpdyProxy: the connection disconnected: %v", errConnection)
+ if exitErr, ok := errConnection.(exec.ExitError); ok && exitErr.Exited() {
+ rc := exitErr.ExitStatus()
+ ctx.writeStatus(&apierrors.StatusError{ErrStatus: v1.Status{
+ Status: v1.StatusFailure,
+ Reason: remotecommandconsts.NonZeroExitCodeReason,
+ Details: &v1.StatusDetails{
+ Causes: []v1.StatusCause{
+ {
+ Type: remotecommandconsts.ExitCodeCauseType,
+ Message: fmt.Sprintf("%d", rc),
+ },
+ },
+ },
+ Message: fmt.Sprintf("command terminated with non-zero exit code: %v", exitErr),
+ }})
+ } else if closeErr, ok := errConnection.(*websocket.CloseError); !ok || closeErr.Text != io.ErrUnexpectedEOF.Error() {
+ //ignore this ErrUnexpectedEOF because isulad always close the connection while reading a frame
+ err = fmt.Errorf("error executing command in container: %v", errConnection)
+ runtime.HandleError(err)
+ ctx.writeStatus(apierrors.NewInternalError(err))
+ }
+ } else {
+ ctx.writeStatus(&apierrors.StatusError{ErrStatus: v1.Status{
+ Status: v1.StatusSuccess,
+ }})
+ }
+}
+
+func connectBackend(addr, subprotocol string, r *http.Request) (*websocket.Conn, error) {
+ h := http.Header{}
+ originHeadValue := r.Header.Get("Origin")
+ if originHeadValue != "" {
+ h["Origin"] = []string{originHeadValue}
+ }
+ websocket.DefaultDialer.Subprotocols = []string{subprotocol}
+ websocket.DefaultDialer.ReadBufferSize = 128 * 1024
+ websocket.DefaultDialer.WriteBufferSize = 128 * 1024
+ ws, resp, err := websocket.DefaultDialer.Dial(addr, h)
+ if err == nil {
+ return ws, nil
+ }
+ msg := fmt.Errorf("dial failed: %v, response Body is nil", err)
+ if resp != nil && resp.Body != nil {
+ defer func() {
+ //websocket buffer size maybe not enough and cause panic
+ if e := recover(); e != nil {
+ msg = fmt.Errorf("dial failed: %v, response panic %v", err, e)
+ }
+ resp.Body.Close()
+ }()
+ var body bytes.Buffer
+ body.ReadFrom(resp.Body)
+ msg = fmt.Errorf("dial failed: %v, response is: %v", err, body.String())
+ }
+ return nil, msg
+}
+
+type rwc struct {
+ c *websocket.Conn
+ index byte
+}
+
+func (c *rwc) Write(p []byte) (int, error) {
+ frame := make([]byte, len(p)+1)
+ frame[0] = byte(c.index)
+ copy(frame[1:], p)
+
+ err := c.c.WriteMessage(websocket.BinaryMessage, frame)
+ if err != nil {
+ return 0, err
+ }
+ return len(p), nil
+}
--
2.34.1
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/liuxu180400617/kubernetes.git
git@gitee.com:liuxu180400617/kubernetes.git
liuxu180400617
kubernetes
kubernetes
master

搜索帮助