使用 python 发布超清抖音视频

2021年09月15日 阅读数:1
这篇文章主要向大家介绍使用 python 发布超清抖音视频,主要内容包括基础应用、实用技巧、原理机制等方面,希望对大家有所帮助。

如今作的项目中有个需求,用户能够在咱们平台能够管理本身的抖音号,可使用抖音的一些功能,好比发布本身的抖音视频等。经过抖音开放平台提供的一些接口能够实现该需求,并且最大支持 4个G 的视频发布,最大时长为 15 分钟。为了下降网关的压力,目前咱们是使用了分片上传。python



使用接口发布视频流程大概分为:
redis

1.调取【初始化上传】接口获得惟一值 upload_id,有效期为 2 个小时json

2.把视频大文件进行分块处理,将分块后的视频、 upload_id 和第几个分片参数带上,去调取【分片上传接口】,把视频文件上传到抖音的服务器上
服务器

3.调取 【完成视频】接口,获得 video_id微信

4.调取 【建立抖音视频】 接口,带上 video_id 参数,发布本身的抖音视频app

成功执行完上述4个步骤,就能够在抖音中看到本身上传的视频了。ide


由于 upload_id 的值会有一些特殊符号,须要对其进行 encode 一下,经过 urlencode 处理便可。post


我还增长了断点续传的功能,例如,我要上传的视频被分红了6块,可是上传3 块视频后发生了一些意外,须要从新上传视频,前三块从新请求上传时并不会真的上传到服务器上,这样减小了流量的消耗。
url


我是使用了 redis 记录已经上传的视频的信息,以 upload_id 的值为 name,建立一个 set 集合,并设置了 2 个小时有效期,上传同一个 upload_id 的分块视频把其第几分片存到 set 集合中,若是第几片的信息已存在,则不上传该分片的视频,不然的话就调取 分片上传 接口,上传服务器上。spa


初始化上传的代码:

class V1InitUploadView(APIView):
def get(self, request): """初始化上传""" user = request.login_user open_id = user.open_id flag, access_token = check_access_token(request) if flag is False: return Response({"error": "请从新登陆"}, status=status.HTTP_401_UNAUTHORIZED) init_url = "https://open.douyin.com/video/part/init/?open_id={}&access_token={}".format(open_id, access_token) init_res = requests.post(url=init_url).json() print("init_res======", init_res) if init_res.get("data", {}).get("error_code") != 0: return Response({"error": init_res.get("data", {}).get("description")}, status=status.HTTP_400_BAD_REQUEST) upload_id = init_res.get("data", {}).get("upload_id") # 得到 upload_id # 在redis 建立空集合并设置过时时间2小时 r_db.sadd(upload_id, 0) r_db.expire(upload_id, 7200) return Response({"upload_id": upload_id})


分片上传视频到服务器的代码:

class V1VideoPartUploadView(APIView):
def post(self, request): """ 分片上传文件到服务器上 :param request: :return: """ user = request.login_user open_id = user.open_id flag, access_token = check_access_token(request) if flag is False: return Response({"error": "请从新登陆"}, status=status.HTTP_401_UNAUTHORIZED) data = request.data upload_id = data.get("upload_id") part_number = data.get("part_number") upload_id_encode = urlencode({"upload_id": upload_id})
if not upload_id or r_db.ttl(upload_id) < 0: return Response({"error": "upload_id已失效"}) if not check_params_is_int(part_number): return Response({"error": "分片类型错误"}) part_number = str(part_number)
video_file = request.FILES.get("video_file") if not video_file: return Response({"error": "请上传文件"}, status=status.HTTP_200_OK) suffix = str(video_file).split(".")[-1] if suffix.lower() not in ( "avi", "wmv", "mpeg", "mp4", "m4v", "mov", "asf", "flv", "f4v", "rmvb", "rm", "3gp", "vob"): return Response({"error": "请上传avi、wmv、mpeg、mp四、m4v、mov、asf、flv、f4v、rmvb、rm、3gp、vob的文件"})
size = video_file.size if int(size) > 20971520: # 设置分块大小为 20 M return Response({"error": "视频大小不能超过20M"}) # 获取该 upload_id 的列表 part_set = {p.decode('utf8') for p in r_db.smembers(upload_id)} if part_number in part_set: return Response({"error_code": 0, "description": ""})
part_url = f"https://open.douyin.com/video/part/upload/?open_id={open_id}&access_token={access_token}" \ f"&part_number={part_number}&" + upload_id_encode part_res = requests.post(url=part_url, files={"video": video_file}).json() res = part_res.get("data", {}) print("part_res=====", part_res) if res.get("error_code") != 0: return Response({"error": res.get("description")}, status=status.HTTP_400_BAD_REQUEST)
else: r_db.sadd(upload_id, part_number) # 记录已上传的视频 return Response(res)


完成上传视频和建立视频的代码:

class V1VideoCreateView(APIView):
def post(self, request): """ 完成上传&&建立视频 :param request: :return: """ user = request.login_user open_id = user.open_id flag, access_token = check_access_token(request) if flag is False: return Response({"error": "请从新登陆"}, status=status.HTTP_401_UNAUTHORIZED) request_data = request.data text = request_data.get("text") # 视频标题, poi_id = request_data.get("poi_id") # 地理位置id poi_name = request_data.get("poi_name") # 地理位置名称 cover_tsp = request_data.get("cover_tsp") # 将传入的指定时间点对应帧设置为视频封面(单位:秒) at_users = request_data.get("at_users") upload_id = request_data.get("upload_id") upload_id_encode = urlencode({"upload_id": upload_id}) if at_users: at_users = at_users.split(",")
complete_url = f"https://open.douyin.com/video/part/complete/?open_id={open_id}&access_token={access_token}&" + upload_id_encode complete_res = requests.post(url=complete_url, data={"upload_id": upload_id}).json() print("complete_res=====", complete_res) if complete_res.get("data", {}).get("error_code") != 0: return Response({"error": complete_res.get("data", {}).get("description")}, status=status.HTTP_400_BAD_REQUEST) video_id = complete_res.get("data", {}).get("video", {}).get("video_id") # # 建立视频 create_url = "https://open.douyin.com/video/create/?open_id={}&access_token={}".format(open_id, access_token) data_create = { "video_id": video_id, "text": text, } if poi_id: data_create["poi_id"] = poi_id if poi_name: data_create["poi_name"] = poi_name if cover_tsp: data_create["cover_tsp"] = cover_tsp if at_users: at_users = at_users.split(",") data_create["at_users"] = at_users
res_create = requests.post(url=create_url, json=data_create).json() res_data = res_create.get("data") if res_data.get("error_code") != 0: return Response({"error": res_data.get("description")}) return Response(res_data)

本文分享自微信公众号 - pythonista的平常(gh_fc70d5d98d3f)。
若有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一块儿分享。