0%

webSocket实现网页端实时展示POD日志

网页端实时展示K8S容器日志

概览

需求:

  • 网页实时查看pod日志
  • 可请求指定行数的日志

思路:

  • Kubernetes Stream:接收数据执行,提供实时返回数据流
  • Django Channels:维持长连接,接收前端请求后转给k8s,通过k8s sdk返回数据给前端
  • CodeMirror:前端组件,用于展示日志

基本数据流向为:

用户请求 –> 新建webSocket –> Django Channels –> Kubernetes SDK

代码

后端Django

模块

1
2
3
4
Django
channels
channels_redis
kubernetes

k8s.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import threading
import urllib3

from kubernetes import client, utils
from kubernetes.stream import stream

from apps.k8s import models

urllib3.disable_warnings()

# 接口地址
# https://github.com/kubernetes-client/python/blob/master/kubernetes/README.md


class K8sApi(object):

def __init__(self, cluster=None, label_selector=None, namespace=None,):
self.cluster = cluster
self.label_selector = label_selector
self.namespace = namespace

def get_client(self):
k8s_qs = models.K8s.objects.get(name=self.cluster)
token = k8s_qs.token
baseurl = k8s_qs.api_server
aConfiguration = client.Configuration()
aConfiguration.host = baseurl
aConfiguration.verify_ssl = False
aConfiguration.api_key = {"authorization": "Bearer " + token}
aApiClient = client.ApiClient(aConfiguration)
return aApiClient

def get_pod_log(self, name, namespace, container, number):
# 查看pod日志
if not number:
number = 100
aApiClient = self.get_client()
client_v1 = client.CoreV1Api(aApiClient)
pod_log = client_v1.read_namespaced_pod_log(
name, namespace, container=container, pretty=True, tail_lines=number, timestamps=True)
return pod_log

def get_pod_log_stream(self, name, namespace, container, number):
"""
实时日志
"""
aApiClient = self.get_client()
client_v1 = client.CoreV1Api(aApiClient)
log_stream = client_v1.read_namespaced_pod_log(
name=name,
namespace=namespace,
container=container,
follow=True,
pretty=True,
_preload_content=False,
timestamps=True,
tail_lines=number
).stream()
return log_stream


class K8SLogStreamThread(threading.Thread):
def __init__(self, websocket, container_stream):
super(K8SLogStreamThread, self).__init__()
self.websocket = websocket
self.stream = container_stream

def run(self):
for s in self.stream:
if s:
# print(s.decode('utf-8'))
# self.websocket.send(bytes_data=s.decode('utf-8'))
self.websocket.send(bytes_data=s)
else:
self.websocket.close()

asgi.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import os
import apps.k8s.routing

from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
from django.core.asgi import get_asgi_application

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'ops.settings')

application = ProtocolTypeRouter({
"http": get_asgi_application(),
# Just HTTP for now. (We can add other protocols later.)
"websocket": AuthMiddlewareStack(
URLRouter(
apps.k8s.routing.websocket_urlpatterns
)
),

})

routing.py

1
2
3
4
5
6
7
8
9
from django.urls import re_path, path

from . import consumers

websocket_urlpatterns = [
# 0.0.0.0:8080/ws/k8s/log/dev_default_promtail-test-64b9dbdbf8-jv4vv_busybox/
re_path(r"ws/k8s/log/(?P<cluster>\w+)_(?P<namespace>[-\w]+)_(?P<pod>[-\w]+)_(?P<container>[-\w]+)/$",
consumers.LogConsumer.as_asgi()),
]

consumers.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
from channels.generic.websocket import WebsocketConsumer
import urllib3
from common.k8s import K8sApi, K8SLogStreamThread

urllib3.disable_warnings()


class LogConsumer(WebsocketConsumer):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

def connect(self):
self.accept()
self._init()
print("连接成功")

def disconnect(self, close_code):
print(f"断开连接")
# print(self.stream)
self.close()
try:
self.stream.close()
print(f"断开连接")
except Exception as e:
pass

def receive(self, text_data, bytes_data):
data = text_data or bytes_data
if data:
self.stream.send(text_data)

def _init(self):
print(self.scope['url_route']['kwargs'])
self.cluster = self.scope['url_route']['kwargs']['cluster']
self.namespace = self.scope['url_route']['kwargs']['namespace']
self.pod = self.scope['url_route']['kwargs']['pod']
self.container = self.scope['url_route']['kwargs']['container']
self.kub = K8sApi(cluster=self.cluster, namespace=self.namespace)
self.stream = None
# self.send(bytes_data=b'Connecting ...\r\n')
try:
self.stream = self.kub.get_pod_log_stream(
self.pod, self.namespace, self.container, 10)
except Exception as e:
# self.send(bytes_data=f'Exception: {e}\r\n'.encode())
self.close()
return
K8SLogStreamThread(self, self.stream).start()

前端VUE

index.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
<template>
<el-dialog :title="dialogTitle" :visible.sync="dialogLog" @close="handleCloseLogDialog">
<div style="margin-bottom:20px">
容器:
<el-select v-model="listQuery.container" placeholder="请选择Container" @change="getPodLog">
<el-option
v-for="item in containerOptions"
:key="item"
:label="item"
:value="item"
/>
</el-select>
日志条数:
<el-select v-model="listQuery.number" placeholder="请选择Container" @change="getPodLog">
<el-option
v-for="item in numberOptions"
:key="item"
:label="item"
:value="item"
/>
</el-select>
是否查看实时日志:
<el-switch
v-model="isFllow"
active-color="#13ce66"
@change="handleChangeFllow"
/>
<br>

<el-input v-model="filter_log" style="margin:10px" placeholder="关键词过滤" />

</div>
<div class="editor-container">
<json-editor v-if="!isFllow" ref="jsonEditor" v-model="log" />
<k8s-log v-if="isFllow" ref="jsonEditor" v-model="log" :read-only="logReadOnly" />
</div>
</el-dialog>
</div>
</template>

<script>
methods: {
handleCloseLogDialog() {
if (this.socket) {
this.close()
}
this.isFllow = false
},
handleChangeFllow() {
console.log(this.isFllow)
if (this.isFllow) {
this.log = ''
this.socketInit()
} else {
this.close()
}
},
handleGetLog(row) {
console.log()
this.listQuery.pod_name = row.name
this.getPodLog()
},
socketInit() {
const url = process.env.VUE_APP_BASE_WS + '/k8s/log/' + this.listQuery.cluster + '_' + this.listQuery.namespace + '_' + this.listQuery.pod_name + '_' + this.listQuery.container + '/'
console.log(url)
this.socket = new WebSocket(url)
this.socket.onopen = this.open
// 监听socket错误信息
this.socket.onerror = this.error
// 监听socket消息
this.socket.onmessage = this.getMessage
// 发送socket消息
this.socket.onsend = this.send
this.socket.onclose = this.close
},
open: function() {
console.log('socket连接成功')
// this.send(JSON.stringify(this.listQuery))
},
error: function() {
console.log('连接错误')
},
close: function(e) {
this.socket.close()
console.log('socket已经关闭', e)
},
getMessage: function(msg) {
var reader = new FileReader()

reader.onload = e => {
if (this.filter_log) {
if (reader.result.includes(this.filter_log)) {
this.log = this.log + reader.result
}
} else {
this.log = this.log + reader.result
}
}
reader.readAsText(msg.data)

// }
},
send: function(order) {
console.log(order)
this.socket.send(order)
},
getPodLog() {
this.dialogTitle = this.listQuery.name
getPodLogReq(this.listQuery).then((response) => {
this.containerOptions = response.data.container_list
this.listQuery.container = response.data.container
this.log = response.data.log
this.dialogLog = true
})
},
}
}
</script>

效果

参考文献:https://meaninglive.com/2021/02/25/python-django%E9%80%9A%E8%BF%87webssh%E6%93%8D%E4%BD%9Ckubernetes-pod/