代码发布5 发布流程, 基于channel-layers实现群发功能, 节点图标的创建, 节点状态动态改变, consumers.py/deploy.html代码示例

发布流程

给任务单的展示页面,添加一个去发布的按钮,点击进入发布界面(项目基本介绍,gojs渲染图标)

关于静态文件夹static既可以在全局创建,也可以在每一个应用创建

"""静态文件配置"""
# 1.配置文件中直接指定查找路径(从上往下查找)      方式一
STATICFILES_DIRS = [
  os.path.join(BASE_DIR,'static1'),
  os.path.join(BASE_DIR,'static2'),
  os.path.join(BASE_DIR,'static3'),
  os.path.join(BASE_DIR,'static4'),
]
# 2.直接利用模版语法     方式二
{% load staticfiles %}
<script src="{% static 'js/go.js' %}"></script>
# 配置文件中配置
STATIC_URL = '/static/'

初始化图标数据应该来自于后端(gojs数据绑定)

当多个用户打开相同的发布页面的时候,只要有一个人在操作,其他人的界面都应该看到效果(channle-layers)

channle-layers基本使用

channnel-layers常用的方法

# 1 如何获取url中无名有名分组的参数
self.scope['url_route']['kwargs'].get('kwargs')
self.scope['url_route']['args'].get('args')

# 2 请求链接成功之后加入对应的群聊中
from asgiref.sync import async_to_sync
async_to_sync(self.channel_layer.group_add)('群号',self.channel_name)

# 3 给客户端发送消息(群发)
async_to_sync(self.channel_layer.group_send)('群号',{'type':'xx.oo','message':'{"code":True,"data":"jason"}'})

# type后面定义的是方法 你需要在当前类下面定义一个方法
def xx_oo(self,event):
  """类似于循环当前群聊里面的所有成员 依次发送数据"""
  data = event.get('message')
  self.send(json.dumps(data))

# 4 断开链接应该将对应群聊里面的用户剔除
async_to_sync(self.channel_layer.group_discard)('群号',self.channel_name)
  • 配置文件中配置参数

CHANNEL_LAYERS = {
    'default':{
        'BACKEND':'channels.layers.InMemoryChannelLayer'
    }
}
  • 如何获取无名有名分组中url携带的参数 (因为此处用类,无法像方法获得有名分组参数)
task_id = self.scope['url_route']['kwargs'].get('task_id')

代码:

# routing.py
application = ProtocolTypeRouter({
    'websocket':URLRouter([
        re_path(r'^publish/(?P<task_id>\d+)',consumers.PublishConsumer)
    ])
})

# consumers.py
class PublishConsumer(WebsocketConsumer):
    def websocket_connect(self, message):
        self.accept()
        # 获取url中无名有名分组的参数   self.scope是一个大字典  里面包含了前端所有的数据
        task_id = self.scope['url_route']['kwargs'].get('task_id')  # scope['url_route']为固定参数,获取有名分组参数
        # task_id = self.scope['url_route']['args'].get('task_id')    # 无名分组获取参数
  • 链接对象自动加入对应的群聊
from asgiref.sync import async_to_sync
async_to_sync(self.channel_layer.group_add)(task_id,self.channel_name)
  • 给特定的群中发消息
async_to_sync(self.channel_layer.group_send)(task_id,{'type':'my.send','message':{'code':'init','data':node_list}})
"""
            后面字典的键是固定的 就叫type和message
            type后面指定的值就是负责发送消息的方法(将message后面的数据交由type后面指定的方法发送给对应的群聊中)
            针对type后面的方法名 有一个固定的变化格式
            my.send     >>>    my_send
            xxx.ooo     >>>    xxx_ooo
"""
  • 在类中需要定义一个专门发送消息的方法
def my_send(self,event):   # event为侯敏的消息对象
        message = event.get('message')  # {'code':'init','data':node_list}
        # 发送数据
        self.send(json.dumps(message))
        """
        内部原理就类似于是循环当前群组里面所有的链接对象 然后依次执行send方法
        for self in self_list:
            self.send
        """
  • 断开链接之后去对应的群聊中剔除群成员
async_to_sync(self.channel_layer.group_discard)(task_id,self.channel_name)

如何区分不同的任务直接群发消息混乱的情况,针对群号应该做区分

其实可以直接使用任务的主键值作为群号

节点数据展示

节点数据无论是初始化的还是动态修改的,都应该是动态生成的而不是直接写死的

其次,当一个任务单以及发布之后,应该保存它的发布节点数据

也就意味着我们需要开设一个模型表用开存储节点相关的所有数据

class Node(models.Model):
    """存储节点数据"""
    text = models.CharField(verbose_name='节点文字',max_length=32)
    # 一个任务单有多个节点  一对多的关系
    task = models.ForeignKey(verbose_name='发布任务单',to='DeployTask')

    status_choices = (
        ('lightgray','待发布'),
        ('green','成功'),
        ('red','失败'),
    )
    status = models.CharField(verbose_name='状态',max_length=32,choices=status_choices,default='lightgray')

    # 子节点 父节点  bbs子评论与根评论
    parent = models.ForeignKey(verbose_name='父节点',to='self',null=True,blank=True)
    # 一个服务器可以有多个节点 一对多的关系
    server = models.ForeignKey(verbose_name='服务器',to='Server',null=True,blank=True)

当用户点击初始化图标的时候,需要操作上述的表查询或者新增记录

# 先去模型表中创建数据 之后再构造gojs所需要的数据类型返回即可
# 群发
            node_object_list = []
            # 1 先去节点表中创建数据(1 先不考虑钩子节点 只创建基本的节点)
            start_node = models.Node.objects.create(text='开始',
                                                    task_id=task_id
                                                    )
            node_object_list.append(start_node)

            download_node = models.Node.objects.create(text='下载',
                                                       task_id=task_id,
                                                       parent=start_node
                                                       )
            node_object_list.append(download_node)

            upload_node = models.Node.objects.create(text='上传',
                                                     task_id=task_id,
                                                     parent=download_node
                                                     )
            node_object_list.append(upload_node)
            # 1.1  服务器相关的节点  一个项目可以有多个服务器
            task_obj = models.DeployTask.objects.filter(pk=task_id).first()
            for server_obj in task_obj.project.servers.all():
                server_node = models.Node.objects.create(text=server_obj.hostname,
                                                         task_id=task_id,
                                                         parent=upload_node,
                                                         server=server_obj
                                                         )
                node_object_list.append(server_node)

            # 2 再将数据构造成gojs所需的格式返回给前端  [{}]
            node_list = []  # [{},{},{},{}]
            for node_obj in node_object_list:
                temp_dic = {
                    'key':str(node_obj.pk),
                    'text':node_obj.text,
                    'color':node_obj.status,
                }
                # 针对parant字段 需要做判断 再考虑是否创建键值对
                if node_obj.parent:
                    # 我们用数据的主键值作为key
                    temp_dic['parent'] = str(node_obj.parent_id)
                node_list.append(temp_dic)

            async_to_sync(self.channel_layer.group_send)(task_id,{'type':'my.send','message':{'code':'init','data':node_list}})

小bug优化

# 1 当一个任务以及初始化过了 再次点击初始化按钮 数据库不应该再写入数据
# 先判断当前任务单是否已经初始化过图标数据了
node_queryset = models.Node.objects.filter(task_id=task_id)
if not node_queryset:
  # 创建和构造数据的操作
else:
  node_object_list = node_queryset

# 2 当用户以及给一个任务单初始化过图标之后。打开页面不应该在此点击按钮 而是直接展示出来  在后端建立连接的方法内 直接查询并返回数据
# 查询当前任务单是否已经初始化过图标  如果有直接查询出来展示到前端  减少用户操作
        node_queryset = models.Node.objects.filter(task_id=task_id)
        if node_queryset:
            node_list = []  # [{},{},{},{}]
            for node_obj in node_queryset:
                temp_dic = {
                    'key': str(node_obj.pk),
                    'text': node_obj.text,
                    'color': node_obj.status,
                }
                # 针对parant字段 需要做判断 再考虑是否创建键值对
                if node_obj.parent:
                    # 我们用数据的主键值作为key
                    temp_dic['parent'] = str(node_obj.parent_id)
                node_list.append(temp_dic)
            # 发送数据  单发/群发???   单发
            self.send(text_data=json.dumps({'code':'init','data':node_list}))

钩子节点展示

钩子脚本内容思路

  • 直接自己规定死只能写shell脚本或者python脚本

  • 兼容各个类型的脚本(脚本表中再开设一个用来标示脚本类型的字段)

  • 通过文件头来指定

判断用户是否书写的钩子脚本

# 判断是否有下载前的钩子脚本
                if task_obj.before_download_script:
                    # 再创建一个下载前的节点
                    start_node = models.Node.objects.create(text='下载前',
                                                            task_id=task_id,
                                                            parent=start_node
                                                            )
                    node_object_list.append(start_node)

"""利用变量名只想的问题来实现箭头指向"""

代码优化

对代码进行封装和优化

# 1 wesocket_recevie方法内的代码过于冗杂  根据功能的不同做优化
"""
将创建节点数据的代码 和 构造gojs所需要的代码封装成两个函数 
"""
def create_node(task_id,task_obj):
    """创建节点数据"""
    node_object_list = []
    # 先判断当前任务单是否已经初始化过图标数据了
    node_queryset = models.Node.objects.filter(task_id=task_id)
    if not node_queryset:
        # 1 先去节点表中创建数据(1 先不考虑钩子节点 只创建基本的节点)
        start_node = models.Node.objects.create(text='开始',
                                                task_id=task_id
                                                )
        node_object_list.append(start_node)

        # 判断是否有下载前的钩子脚本
        if task_obj.before_download_script:
            # 再创建一个下载前的节点
            start_node = models.Node.objects.create(text='下载前',
                                                    task_id=task_id,
                                                    parent=start_node
                                                    )
            node_object_list.append(start_node)

        download_node = models.Node.objects.create(text='下载',
                                                   task_id=task_id,
                                                   parent=start_node
                                                   )
        node_object_list.append(download_node)

        # 判断是否有下载后的钩子脚本
        if task_obj.after_download_script:
            # 再创建一个下载后的节点
            download_node = models.Node.objects.create(text='下载后',
                                                       task_id=task_id,
                                                       parent=download_node
                                                       )
            node_object_list.append(download_node)

        upload_node = models.Node.objects.create(text='上传',
                                                 task_id=task_id,
                                                 parent=download_node
                                                 )
        node_object_list.append(upload_node)
        # 1.1  服务器相关的节点  一个项目可以有多个服务器
        task_obj = models.DeployTask.objects.filter(pk=task_id).first()
        for server_obj in task_obj.project.servers.all():
            server_node = models.Node.objects.create(text=server_obj.hostname,
                                                     task_id=task_id,
                                                     parent=upload_node,
                                                     server=server_obj
                                                     )
            node_object_list.append(server_node)

            # 判断是否有发布前的钩子
            if task_obj.before_deploy_script:
                # 再创建一个下载后的节点
                server_node = models.Node.objects.create(text='发布前',
                                                         task_id=task_id,
                                                         parent=server_node,
                                                         server=server_obj
                                                         )
                node_object_list.append(server_node)

            # 先再创建一个发布节点
            deploy_node = models.Node.objects.create(text='发布',
                                                     task_id=task_id,
                                                     parent=server_node,
                                                     server=server_obj
                                                     )
            node_object_list.append(deploy_node)

            # 判断是否有发布后的钩子
            if task_obj.after_deploy_script:
                # 再创建一个下载后的节点
                after_deploy_node = models.Node.objects.create(text='发布后',
                                                               task_id=task_id,
                                                               parent=deploy_node,
                                                               server=server_obj
                                                               )
                node_object_list.append(after_deploy_node)
    else:
        node_object_list = node_queryset
    return node_object_list

def convert_data_to_gojs(node_object_list):
    """将数据转化成gojs所需要的类型"""
    # 2 再将数据构造成gojs所需的格式返回给前端  [{}]
    node_list = []  # [{},{},{},{}]
    for node_obj in node_object_list:
        temp_dic = {
            'key': str(node_obj.pk),
            'text': node_obj.text,
            'color': node_obj.status,
        }
        # 针对parant字段 需要做判断 再考虑是否创建键值对
        if node_obj.parent:
            # 我们用数据的主键值作为key
            temp_dic['parent'] = str(node_obj.parent_id)
        node_list.append(temp_dic)
    return node_list

强调:有点一定要是先写好,之后再去优化,不要同步进行

节点动态变化

节点背后对应的操作,无外乎就是下载代码(gitpython模块的运用)

其次就是将下载好的代码上传到远程服务器上(paramiko模块的运用)

# 给发布按钮绑定事件 点击触发节点背后对应的所有的操作

consumers.py代码示例

from channels.generic.websocket import WebsocketConsumer
from channels.exceptions import StopConsumer
import json
from asgiref.sync import async_to_sync
from app01 import models


def create_node(task_id,task_obj):
    """创建节点数据"""
    node_object_list = []
    # 先判断当前任务单是否已经初始化过图标数据了
    node_queryset = models.Node.objects.filter(task_id=task_id)
    if not node_queryset:
        # 1 先去节点表中创建数据(1 先不考虑钩子节点 只创建基本的节点)
        start_node = models.Node.objects.create(text='开始',task_id=task_id)
        node_object_list.append(start_node)

        # 判断是否有下载前的钩子脚本
        if task_obj.before_download_script:
            # 再创建一个下载前的节点
            start_node = models.Node.objects.create(text='下载前',task_id=task_id,parent=start_node)
            node_object_list.append(start_node)

        download_node = models.Node.objects.create(text='下载',task_id=task_id,parent=start_node)
        node_object_list.append(download_node)

        # 判断是否有下载后的钩子脚本
        if task_obj.after_download_script:
            # 再创建一个下载后的节点
            download_node = models.Node.objects.create(text='下载后',task_id=task_id,parent=download_node)
            node_object_list.append(download_node)

        upload_node = models.Node.objects.create(text='上传',task_id=task_id,parent=download_node)
        node_object_list.append(upload_node)
        # 1.1  服务器相关的节点  一个项目可以有多个服务器
        task_obj = models.DeployTask.objects.filter(pk=task_id).first()
        for server_obj in task_obj.project.servers.all():
            server_node = models.Node.objects.create(text=server_obj.hostname,task_id=task_id,
                                                     parent=upload_node,server=server_obj)
            node_object_list.append(server_node)

            # 判断是否有发布前的钩子
            if task_obj.before_deploy_script:
                # 再创建一个下载后的节点
                server_node = models.Node.objects.create(text='发布前',task_id=task_id,
                                                         parent=server_node,server=server_obj)
                node_object_list.append(server_node)

            # 先再创建一个发布节点
            deploy_node = models.Node.objects.create(text='发布',task_id=task_id,
                                                     parent=server_node,server=server_obj)
            node_object_list.append(deploy_node)

            # 判断是否有发布后的钩子
            if task_obj.after_deploy_script:
                # 再创建一个下载后的节点
                after_deploy_node = models.Node.objects.create(text='发布后',task_id=task_id,
                                                               parent=deploy_node,server=server_obj)
                node_object_list.append(after_deploy_node)
    else:
        node_object_list = node_queryset
    return node_object_list


def convert_data_to_gojs(node_object_list):
    """将数据转化成gojs所需要的类型"""
    # 2 再将数据构造成gojs所需的格式返回给前端  [{}]
    node_list = []  # [{},{},{},{}]
    for node_obj in node_object_list:
        temp_dic = {
            'key': str(node_obj.pk),
            'text': node_obj.text,
            'color': node_obj.status,
        }
        # 针对parant字段 需要做判断 再考虑是否创建键值对
        if node_obj.parent:
            # 我们用数据的主键值作为key
            temp_dic['parent'] = str(node_obj.parent_id)
        node_list.append(temp_dic)
    return node_list


class PublishConsumer(WebsocketConsumer):
    def websocket_connect(self, message):
        self.accept()
        # 获取url中有名分组的参数  self.scope是一个大字典 里面包含了前端所有的数据
        task_id = self.scope['url_route']['kwargs'].get('task_id')
        # 应该将该用户添加到对应的群聊中  固定写法
        async_to_sync(self.channel_layer.group_add)(task_id,self.channel_name)
        # 第一个参数是群号 必须是字符串格式
        # 第二个参数类似于群成员的唯一标示

        # 查询当前任务单是否已经初始化过图标  如果有直接查询出来展示到前端  减少用户操作
        node_queryset = models.Node.objects.filter(task_id=task_id)
        if node_queryset:
            node_list = convert_data_to_gojs(node_queryset)
            # 发送数据  单发/群发???   单发
            self.send(text_data=json.dumps({'code':'init','data':node_list}))


    def websocket_receive(self, message):
        task_id = self.scope['url_route']['kwargs'].get('task_id')
        task_obj = models.DeployTask.objects.filter(pk=task_id).first()
        text = message.get('text')
        # 由于请求图标的数据 可以分为两部分 第一部分是初始化 第二部分是动态变化
        # 我们对text做判断 来区分到底需要哪部分数据
        if text == 'init':
            # node_list = [
            #         {"key": "start", "text": '开始', "figure": 'Ellipse', "color": "lightgreen"},
            #         {"key": "download", "parent": 'start', "text": '下载代码', "color": "lightgreen", "link_text": '执行中...'},
            #         {"key": "compile", "parent": 'download', "text": '本地编译', "color": "lightgreen"},
            #         ]
            # 要对数据进行序列化处理  单发
            # self.send(text_data=json.dumps({"code":'init',"data":node_list}))
            # 一个操作节点模型表完成数据展示
            # 群发
            # 1.创建节点数据
            node_object_list = create_node(task_id,task_obj)
            # 2.构造gojs所需要的数据类型
            node_list = convert_data_to_gojs(node_object_list)
            async_to_sync(self.channel_layer.group_send)(task_id,{'type':'my.send','message':{'code':'init','data':node_list}})
            """
            后面字典的键是固定的 就叫type和message
            type后面指定的值就是负责发送消息的方法(将message后面的数据交由type后面指定的方法发送给对应的群聊中)
            针对type后面的方法名 有一个固定的变化格式
            my.send     >>>    my_send
            xxx.ooo     >>>    xxx_ooo
            """
        if text == 'deploy':
            """
            1 先默认让所有的节点执行成功         
            2 再真正的执行命令操作
            
            第一步
                开始节点 开始节点无需任何操作 直接成功即可 
            第二步
                下载前 执行本地脚本 执行成功或者失败
            第三步
                下载 利用gitpython做操作
            第四步
                下载后 执行本地脚本 执行成功或者失败
            第五步
                上传  paramiko模块
            第六步
                链接每天服务器
                上传代码
                发布前钩子
                发布
                发布后钩子
            """

    def my_send(self,event):
        message = event.get('message')  # {'code':'init','data':node_list}
        # 发送数据
        self.send(json.dumps(message))
        """
        内部原理就类似于是循环当前群组里面所有的链接对象 然后依次执行send方法
        for self in self_list:
            self.send
        """

    def websocket_disconnect(self, message):
        task_id = self.scope['url_route']['kwargs'].get('task_id')
        async_to_sync(self.channel_layer.group_discard)(task_id,self.channel_name)
        raise StopConsumer()

deploy.html代码示例

{% extends 'base.html' %}
{% load staticfiles %}

{% block content %}
    <h1>发布任务</h1>
{#    1 操作按钮区域#}
    <div >
    <button class="btn btn-primary" onclick="initDiagram()">初始化图标</button>
    <button  class="btn btn-primary" onclick="releaseTask()">发布任务</button>
    </div>
{#    2 基本信息展示区#}
    <table class="table table-hover table-striped">
        <tbody>
        <tr>
            <td>项目名称:{{ project_obj.title }}</td>
            <td>环境:{{ project_obj.get_env_display }}</td>
        </tr>
        <tr>
            <td>版本:{{ task_obj.tag }}</td>
            <td>状态:{{ task_obj.get_status_display }}</td>
        </tr>
        <tr>
            <td colspan="2">仓库地址:{{ project_obj.repo }}</td>
        </tr>
        <tr>
            <td colspan="2">线上路径:{{ project_obj.path }}</td>
        </tr>
        <tr>
            <td colspan="2">
                <div>关联服务器</div>
                <ul>
                    {% for server_obj in project_obj.servers.all %}
                        <li>{{ server_obj.hostname }}</li>
                    {% endfor %}
                </ul>
            </td>
        </tr>
        </tbody>
    </table>
{#    3 图形动态展示区#}
    <div  ></div>
{% endblock %}


{% block js %}
    <script src="{% static 'js/go.js' %}"></script>
    <script>
        var ws;
        var diagram;
        function initWebSocket() {
            ws = new WebSocket('ws://127.0.0.1:8000/publish/{{ task_obj.pk }}/');

            ws.onmessage = function (event) {
                {#console.log(typeof  event.data)  // {'code':'','data':''}#}
                // 容易出错
                var res = JSON.parse(event.data)
                if (res.code==='init'){
                    diagram.model = new go.TreeModel(res.data);
                }
            }
        }
        function initTable() {
        var $ = go.GraphObject.make;
        diagram = $(go.Diagram, "diagramDiv",{
            layout: $(go.TreeLayout, {
                angle: 0,
                nodeSpacing: 20,
                layerSpacing: 70
            })
        });
        // 创建一个节点模版
        diagram.nodeTemplate = $(go.Node, "Auto",
            $(go.Shape, {
                figure: "RoundedRectangle",
                fill: 'yellow',
                stroke: 'yellow'
            }, new go.Binding("figure", "figure"), new go.Binding("fill", "color"), new go.Binding("stroke", "color")),
            $(go.TextBlock, {margin: 8}, new go.Binding("text", "text"))
        );
        // 创建一个箭头模版
        diagram.linkTemplate = $(go.Link,
            {routing: go.Link.Orthogonal},
            $(go.Shape, {stroke: 'yellow'}, new go.Binding('stroke', 'link_color')),
            $(go.Shape, {toArrow: "OpenTriangle", stroke: 'yellow'}, new go.Binding('stroke', 'link_color'))
        );
        // 这里的数据后期就可以通过后端来获取
        {#var nodeDataArray = [#}
        {#    {key: "start", text: '开始', figure: 'Ellipse', color: "lightgreen"},#}
        {#    {key: "download", parent: 'start', text: '下载代码', color: "lightgreen", link_text: '执行中...'},#}
        {#    {key: "compile", parent: 'download', text: '本地编译', color: "lightgreen"},#}
        {#    {key: "zip", parent: 'compile', text: '打包', color: "red", link_color: 'red'},#}
        {#    {key: "c1", text: '服务器1', parent: "zip"},#}
        {#    {key: "c11", text: '服务重启', parent: "c1"},#}
        {#    {key: "c2", text: '服务器2', parent: "zip"},#}
        {#    {key: "c21", text: '服务重启', parent: "c2"},#}
        {#    {key: "c3", text: '服务器3', parent: "zip"},#}
        {#    {key: "c31", text: '服务重启', parent: "c3"}#}
        {#];#}
        {#diagram.model = new go.TreeModel(nodeDataArray);#}

        // 动态控制节点颜色变化
        //var node = diagram.model.findNodeDataForKey("zip");
        // diagram.model.setDataProperty(node, "color", "lightgreen");
    }
        $(function () {
            //  页面加载完毕 直接先初始化websocket对象和图标对象
            initWebSocket()
            initTable()
        })

        function initDiagram() {
            // 朝后端要初始化图标的数据
            ws.send('init')
        }
        function releaseTask() {
            // 朝后端发送执行任务的命令
            ws.send('deploy')
        }
    </script>
{% endblock %}