diff --git a/README-en.md b/README-en.md index 2cbc285..d5a962b 100644 --- a/README-en.md +++ b/README-en.md @@ -101,6 +101,7 @@ graph TD - Displays logs and upload progress - Supports **automatic speed test and selection of the best route** (default) - Supports specifying upload lines (`qn`, `bldsa`, `ws`, `bda2`, `tx`) + - Supports uploading cover image - `bilitool append` appends videos to existing videos (multi-part) - `bilitool download` downloads videos - Supports downloading with `bvid` and `avid` identifiers diff --git a/README.md b/README.md index d81c5a9..71e9333 100644 --- a/README.md +++ b/README.md @@ -101,6 +101,7 @@ graph TD - 显示日志与上传进度 - 支持**自动测速并且选择最佳线路**(默认) - 支持指定上传线路(`qn`, `bldsa`, `ws`, `bda2`, `tx`) + - 支持上传视频封面图片 - `bilitool append` 追加视频到已有的视频(**分p投稿**) - `bilitool download` 下载视频 - 支持 `bvid` 和 `avid` 两种编号下载 diff --git a/bilitool/__init__.py b/bilitool/__init__.py index af0bf09..b091165 100644 --- a/bilitool/__init__.py +++ b/bilitool/__init__.py @@ -5,4 +5,11 @@ from .utils.get_ip_info import IPInfo from .utils.check_format import CheckFormat -__all__ = ['LoginController', 'UploadController', 'DownloadController', 'FeedController', 'IPInfo', 'CheckFormat'] \ No newline at end of file +__all__ = [ + "LoginController", + "UploadController", + "DownloadController", + "FeedController", + "IPInfo", + "CheckFormat", +] diff --git a/bilitool/authenticate/wbi_sign.py b/bilitool/authenticate/wbi_sign.py index 9c0106c..10ebbee 100644 --- a/bilitool/authenticate/wbi_sign.py +++ b/bilitool/authenticate/wbi_sign.py @@ -9,57 +9,115 @@ # https://github.com/SocialSisterYi/bilibili-API-collect/blob/master/docs/misc/sign/wbi.md + class WbiSign(object): def __init__(self): self.config = Model().get_config() mixinKeyEncTab = [ - 46, 47, 18, 2, 53, 8, 23, 32, 15, 50, 10, 31, 58, 3, 45, 35, 27, 43, 5, 49, - 33, 9, 42, 19, 29, 28, 14, 39, 12, 38, 41, 13, 37, 48, 7, 16, 24, 55, 40, - 61, 26, 17, 0, 1, 60, 51, 30, 4, 22, 25, 54, 21, 56, 59, 6, 63, 57, 62, 11, - 36, 20, 34, 44, 52 + 46, + 47, + 18, + 2, + 53, + 8, + 23, + 32, + 15, + 50, + 10, + 31, + 58, + 3, + 45, + 35, + 27, + 43, + 5, + 49, + 33, + 9, + 42, + 19, + 29, + 28, + 14, + 39, + 12, + 38, + 41, + 13, + 37, + 48, + 7, + 16, + 24, + 55, + 40, + 61, + 26, + 17, + 0, + 1, + 60, + 51, + 30, + 4, + 22, + 25, + 54, + 21, + 56, + 59, + 6, + 63, + 57, + 62, + 11, + 36, + 20, + 34, + 44, + 52, ] def get_wbi_keys(self) -> tuple[str, str]: """Get the refresh token""" headers = Model().get_headers_with_cookies_and_refer() - resp = requests.get('https://api.bilibili.com/x/web-interface/nav', headers=headers) + resp = requests.get( + "https://api.bilibili.com/x/web-interface/nav", headers=headers + ) resp.raise_for_status() json_content = resp.json() - img_url: str = json_content['data']['wbi_img']['img_url'] - sub_url: str = json_content['data']['wbi_img']['sub_url'] - img_key = img_url.rsplit('/', 1)[1].split('.')[0] - sub_key = sub_url.rsplit('/', 1)[1].split('.')[0] + img_url: str = json_content["data"]["wbi_img"]["img_url"] + sub_url: str = json_content["data"]["wbi_img"]["sub_url"] + img_key = img_url.rsplit("/", 1)[1].split(".")[0] + sub_key = sub_url.rsplit("/", 1)[1].split(".")[0] return img_key, sub_key def get_mixin_key(self, orig: str): """shuffle the string""" - return reduce(lambda s, i: s + orig[i], self.mixinKeyEncTab, '')[:32] + return reduce(lambda s, i: s + orig[i], self.mixinKeyEncTab, "")[:32] def enc_wbi(self, params: dict, img_key: str, sub_key: str): """wbi sign""" mixin_key = self.get_mixin_key(img_key + sub_key) curr_time = round(time.time()) - params['wts'] = curr_time + params["wts"] = curr_time params = dict(sorted(params.items())) # filter the value of "!'()*" params = { - k: ''.join(filter(lambda char: char not in "!'()*", str(v))) - for k, v - in params.items() + k: "".join(filter(lambda char: char not in "!'()*", str(v))) + for k, v in params.items() } query = urllib.parse.urlencode(params) wbi_sign = md5((query + mixin_key).encode()).hexdigest() # add `w_rid` parameter in the url - params['w_rid'] = wbi_sign + params["w_rid"] = wbi_sign return params def get_wbi_signed_params(self, params): img_key, sub_key = self.get_wbi_keys() - signed_params = self.enc_wbi( - params=params, - img_key=img_key, - sub_key=sub_key - ) + signed_params = self.enc_wbi(params=params, img_key=img_key, sub_key=sub_key) return signed_params diff --git a/bilitool/cli.py b/bilitool/cli.py index 55c165a..b7d3bfe 100644 --- a/bilitool/cli.py +++ b/bilitool/cli.py @@ -5,20 +5,30 @@ import os import logging import textwrap -from bilitool import LoginController, UploadController, DownloadController, FeedController, IPInfo, CheckFormat +from bilitool import ( + LoginController, + UploadController, + DownloadController, + FeedController, + IPInfo, + CheckFormat, +) + def cli(): logging.basicConfig( - format='[%(levelname)s] - [%(asctime)s %(name)s] - %(message)s', - level=logging.INFO + format="[%(levelname)s] - [%(asctime)s %(name)s] - %(message)s", + level=logging.INFO, ) parser = argparse.ArgumentParser( prog="bilitool", formatter_class=argparse.RawDescriptionHelpFormatter, - description=textwrap.dedent(''' + description=textwrap.dedent( + """ The Python toolkit package and cli designed for interaction with Bilibili. Source code at https://github.com/timerring/bilitool - '''), + """ + ), ) parser.add_argument( "-V", @@ -28,64 +38,142 @@ def cli(): help="Print version information", ) - subparsers = parser.add_subparsers(dest='subcommand', help='Subcommands') + subparsers = parser.add_subparsers(dest="subcommand", help="Subcommands") # Login subcommand - login_parser = subparsers.add_parser('login', help='Login and save the cookie') - login_parser.add_argument('-f', '--file', default='', help='(default is empty) Login via cookie file') - login_parser.add_argument('--export', action='store_true', help='(default is false) Export the login cookie file') + login_parser = subparsers.add_parser("login", help="Login and save the cookie") + login_parser.add_argument( + "-f", "--file", default="", help="(default is empty) Login via cookie file" + ) + login_parser.add_argument( + "--export", + action="store_true", + help="(default is false) Export the login cookie file", + ) # Logout subcommand - logout_parser = subparsers.add_parser('logout', help='Logout the current account') + logout_parser = subparsers.add_parser("logout", help="Logout the current account") # Upload subcommand - upload_parser = subparsers.add_parser('upload', help='Upload the video') - upload_parser.add_argument('video_path', help='(required) The path to video file') - upload_parser.add_argument('-y', '--yaml', default='', help='The path to yaml file(if yaml file is provided, the arguments below will be ignored)') - upload_parser.add_argument('--copyright', type=int, default=2, help='(default is 2) 1 for original, 2 for reprint') - upload_parser.add_argument('--title', default='', help='(default is video name) The title of video') - upload_parser.add_argument('--desc', default='', help='(default is empty) The description of video') - upload_parser.add_argument('--tid', type=int, default=138, help='(default is 138) For more info to the type id, refer to https://biliup.github.io/tid-ref.html') - upload_parser.add_argument('--tag', default='bilitool', help='(default is bilitool) Video tags, separated by comma') - upload_parser.add_argument('--source', default='来源于网络', help='(default is 来源于网络) The source of video (if your video is re-print)') - upload_parser.add_argument('--cover', default='', help='(default is empty) The cover of video (if you want to customize, set it as the path to your cover image)') - upload_parser.add_argument('--dynamic', default='', help='(default is empty) The dynamic information') - upload_parser.add_argument('--cdn', default='', help='(default is auto detect) The cdn line') + upload_parser = subparsers.add_parser("upload", help="Upload the video") + upload_parser.add_argument("video_path", help="(required) The path to video file") + upload_parser.add_argument( + "-y", + "--yaml", + default="", + help="The path to yaml file(if yaml file is provided, the arguments below will be ignored)", + ) + upload_parser.add_argument( + "--copyright", + type=int, + default=2, + help="(default is 2) 1 for original, 2 for reprint", + ) + upload_parser.add_argument( + "--title", default="", help="(default is video name) The title of video" + ) + upload_parser.add_argument( + "--desc", default="", help="(default is empty) The description of video" + ) + upload_parser.add_argument( + "--tid", + type=int, + default=138, + help="(default is 138) For more info to the type id, refer to https://bilitool.timerring.com/tid.html", + ) + upload_parser.add_argument( + "--tag", + default="bilitool", + help="(default is bilitool) Video tags, separated by comma", + ) + upload_parser.add_argument( + "--source", + default="来源于网络", + help="(default is 来源于网络) The source of video (if your video is re-print)", + ) + upload_parser.add_argument( + "--cover", + default="", + help="(default is empty) The cover of video (if you want to customize, set it as the path to your cover image)", + ) + upload_parser.add_argument( + "--dynamic", default="", help="(default is empty) The dynamic information" + ) + upload_parser.add_argument( + "--cdn", default="", help="(default is auto detect) The cdn line" + ) # Append subcommand - append_parser = subparsers.add_parser('append', help='Append the video') - append_parser.add_argument('-v','--vid', required=True, help='(required) The bvid or avid of appended video') - append_parser.add_argument('video_path', help='(required) The path to video file') - append_parser.add_argument('--cdn', default='', help='(default is auto detect) The cdn line') + append_parser = subparsers.add_parser("append", help="Append the video") + append_parser.add_argument( + "-v", + "--vid", + required=True, + help="(required) The bvid or avid of appended video", + ) + append_parser.add_argument("video_path", help="(required) The path to video file") + append_parser.add_argument( + "--cdn", default="", help="(default is auto detect) The cdn line" + ) # Check login subcommand - check_login_parser = subparsers.add_parser('check', help='Check if the user is logged in') + check_login_parser = subparsers.add_parser( + "check", help="Check if the user is logged in" + ) # Download subcommand - download_parser = subparsers.add_parser('download', help='Download the video') + download_parser = subparsers.add_parser("download", help="Download the video") - download_parser.add_argument('vid', help='(required) the bvid or avid of video') - download_parser.add_argument('--danmaku', action='store_true', help='(default is false) download the danmaku of video') - download_parser.add_argument('--quality', type=int, default=64, help='(default is 64) the resolution of video') - download_parser.add_argument('--chunksize', type=int, default=1024, help='(default is 1024) the chunk size of video') - download_parser.add_argument('--multiple', action='store_true', help='(default is false) download the multiple videos if have set') + download_parser.add_argument("vid", help="(required) the bvid or avid of video") + download_parser.add_argument( + "--danmaku", + action="store_true", + help="(default is false) download the danmaku of video", + ) + download_parser.add_argument( + "--quality", + type=int, + default=64, + help="(default is 64) the resolution of video", + ) + download_parser.add_argument( + "--chunksize", + type=int, + default=1024, + help="(default is 1024) the chunk size of video", + ) + download_parser.add_argument( + "--multiple", + action="store_true", + help="(default is false) download the multiple videos if have set", + ) # List subcommand - list_parser = subparsers.add_parser('list', help='Get the uploaded video list') - list_parser.add_argument('--size', type=int, default=20, help='(default is 20) the size of video list') - list_parser.add_argument('--status', default='pubed,not_pubed,is_pubing', help='(default is all) the status of video list: pubed, not_pubed, is_pubing') + list_parser = subparsers.add_parser("list", help="Get the uploaded video list") + list_parser.add_argument( + "--size", type=int, default=20, help="(default is 20) the size of video list" + ) + list_parser.add_argument( + "--status", + default="pubed,not_pubed,is_pubing", + help="(default is all) the status of video list: pubed, not_pubed, is_pubing", + ) # Show subcommand - show_parser = subparsers.add_parser('show', help='Show the video detailed info') - show_parser.add_argument('vid', help='The avid or bvid of the video') + show_parser = subparsers.add_parser("show", help="Show the video detailed info") + show_parser.add_argument("vid", help="The avid or bvid of the video") # Convert subcommand - convert_parser = subparsers.add_parser('convert', help='Convert between avid and bvid') - convert_parser.add_argument('vid', help='The avid or bvid of the video') + convert_parser = subparsers.add_parser( + "convert", help="Convert between avid and bvid" + ) + convert_parser.add_argument("vid", help="The avid or bvid of the video") # IP subcommand - ip_parser = subparsers.add_parser('ip', help='Get the ip info') - ip_parser.add_argument('--ip', default='', help='(default is your request ip) The ip address') + ip_parser = subparsers.add_parser("ip", help="Get the ip info") + ip_parser.add_argument( + "--ip", default="", help="(default is your request ip) The ip address" + ) args = parser.parse_args() @@ -95,41 +183,55 @@ def cli(): parser.print_help() sys.exit() - if args.subcommand == 'login': + if args.subcommand == "login": if args.file: LoginController().login_bilibili_with_cookie_file(args.file) else: LoginController().login_bilibili(args.export) - if args.subcommand == 'logout': + if args.subcommand == "logout": LoginController().logout_bilibili() - if args.subcommand == 'check': + if args.subcommand == "check": LoginController().check_bilibili_login() - if args.subcommand == 'upload': + if args.subcommand == "upload": # print(args) - UploadController().upload_video_entry(args.video_path, args.yaml, args.copyright, - args.tid, args.title, args.desc, args.tag, args.source, args.cover, args.dynamic, args.cdn) - - if args.subcommand == 'append': + UploadController().upload_video_entry( + args.video_path, + args.yaml, + args.copyright, + args.tid, + args.title, + args.desc, + args.tag, + args.source, + args.cover, + args.dynamic, + args.cdn, + ) + + if args.subcommand == "append": UploadController().append_video_entry(args.video_path, args.vid, args.cdn) - if args.subcommand == 'download': + if args.subcommand == "download": # print(args) - DownloadController().download_video_entry(args.vid, args.danmaku, args.quality, args.chunksize, args.multiple) - - if args.subcommand == 'list': + DownloadController().download_video_entry( + args.vid, args.danmaku, args.quality, args.chunksize, args.multiple + ) + + if args.subcommand == "list": FeedController().print_video_list_info(args.size, args.status) - - if args.subcommand == 'show': + + if args.subcommand == "show": FeedController().print_video_info(args.vid) - if args.subcommand == 'convert': + if args.subcommand == "convert": CheckFormat().convert_bv_and_av(args.vid) - - if args.subcommand == 'ip': + + if args.subcommand == "ip": IPInfo.get_ip_address(args.ip) -if __name__ == '__main__': - cli() \ No newline at end of file + +if __name__ == "__main__": + cli() diff --git a/bilitool/controller/download_controller.py b/bilitool/controller/download_controller.py index 551be57..0e06085 100644 --- a/bilitool/controller/download_controller.py +++ b/bilitool/controller/download_controller.py @@ -9,14 +9,14 @@ class DownloadController: def __init__(self): - self.logger = logging.getLogger('bilitool') + self.logger = logging.getLogger("bilitool") self.model = Model() self.bili_downloader = BiliDownloader(self.logger) self.config = self.model.get_config() def extract_filename(self, filename): illegal_chars = r'[\\/:"*?<>|]' - filename = re.sub(illegal_chars, '', filename) + filename = re.sub(illegal_chars, "", filename) return filename @staticmethod @@ -25,7 +25,7 @@ def package_download_metadata(danmaku, quality, chunksize, multiple): "danmaku": danmaku, "quality": quality, "chunksize": chunksize, - "multiple": multiple + "multiple": multiple, } def get_cid(self, bvid): @@ -35,14 +35,14 @@ def download_video(self, bvid): cid_group = self.get_cid(bvid) if self.config["download"]["multiple"]: for i in range(0, len(cid_group)): - cid = str(cid_group[i]['cid']) - name = cid_group[i]['part'] - self.logger.info(f'Begin download {name}') + cid = str(cid_group[i]["cid"]) + name = cid_group[i]["part"] + self.logger.info(f"Begin download {name}") self.download_biv_and_danmaku(bvid, cid, name) else: - cid = str(cid_group[0]['cid']) - name = cid_group[0]['part'] - self.logger.info(f'Begin download {name}') + cid = str(cid_group[0]["cid"]) + name = cid_group[0]["part"] + self.logger.info(f"Begin download {name}") self.download_biv_and_danmaku(bvid, cid, name) def download_biv_and_danmaku(self, bvid, cid, name_raw="video"): @@ -54,8 +54,9 @@ def download_danmaku(self, cid, name="video"): self.bili_downloader.download_danmaku(cid, name) def download_video_entry(self, vid, danmaku, quality, chunksize, multiple): - download_metadata = self.package_download_metadata(danmaku, quality, chunksize, multiple) - Model().update_multiple_config('download', download_metadata) + download_metadata = self.package_download_metadata( + danmaku, quality, chunksize, multiple + ) + Model().update_multiple_config("download", download_metadata) bvid = CheckFormat().only_bvid(vid) self.download_video(bvid) - \ No newline at end of file diff --git a/bilitool/controller/feed_controller.py b/bilitool/controller/feed_controller.py index c4f04c6..3e5e40e 100644 --- a/bilitool/controller/feed_controller.py +++ b/bilitool/controller/feed_controller.py @@ -3,19 +3,23 @@ from ..feed.bili_video_list import BiliVideoList from ..utils.check_format import CheckFormat + class FeedController(object): def __init__(self): self.bili_video_list = BiliVideoList() - - def print_video_list_info(self, size: int = 20, status_type: str = 'pubed,not_pubed,is_pubing'): + def print_video_list_info( + self, size: int = 20, status_type: str = "pubed,not_pubed,is_pubing" + ): self.bili_video_list.print_video_list_info(size, status_type) - + def print_video_info(self, vid: str): bvid = CheckFormat().only_bvid(vid) video_info = self.bili_video_list.get_video_info(bvid) extracted_info = self.bili_video_list.extract_video_info(video_info) self.bili_video_list.print_video_info(extracted_info) - def get_video_dict_info(self, size: int = 20, status_type: str = 'pubed,not_pubed,is_pubing'): + def get_video_dict_info( + self, size: int = 20, status_type: str = "pubed,not_pubed,is_pubing" + ): return self.bili_video_list.get_video_dict_info(size, status_type) diff --git a/bilitool/controller/login_controller.py b/bilitool/controller/login_controller.py index e2b2d73..15683d3 100644 --- a/bilitool/controller/login_controller.py +++ b/bilitool/controller/login_controller.py @@ -13,9 +13,11 @@ def __init__(self): self.login_bili = LoginBili() self.logout_bili = LogoutBili() self.check_bili_login = CheckBiliLogin() - + def login_bilibili(self, export): - input("Please maximize the window to ensure the QR code is fully displayed, press Enter to continue: ") + input( + "Please maximize the window to ensure the QR code is fully displayed, press Enter to continue: " + ) login_url, auth_code = self.login_bili.get_tv_qrcode_url_and_auth_code() qr = qrcode.QRCode() qr.add_data(login_url) diff --git a/bilitool/controller/upload_controller.py b/bilitool/controller/upload_controller.py index ac3a848..5b807ab 100644 --- a/bilitool/controller/upload_controller.py +++ b/bilitool/controller/upload_controller.py @@ -11,21 +11,23 @@ class UploadController: def __init__(self): - self.logger = logging.getLogger('bilitool') + self.logger = logging.getLogger("bilitool") self.bili_uploader = BiliUploader(self.logger) @staticmethod - def package_upload_metadata(copyright, tid, title, desc, tag, source, cover, dynamic): - return { - 'copyright': copyright, - 'tid': tid, - 'title': title, - 'desc': desc, - 'tag': tag, - 'source': source, - 'cover': cover, - 'dynamic': dynamic - } + def package_upload_metadata( + copyright, tid, title, desc, tag, source, cover, dynamic + ): + return { + "copyright": copyright, + "tid": tid, + "title": title, + "desc": desc, + "tag": tag, + "source": source, + "cover": cover, + "dynamic": dynamic, + } def upload_video(self, file, cdn=None): """upload and publish video on bilibili""" @@ -47,31 +49,35 @@ def upload_video(self, file, cdn=None): else: upos_url, cdn, probe_version = self.bili_uploader.probe() file = Path(file) - assert file.exists(), f'The file {file} does not exist' + assert file.exists(), f"The file {file} does not exist" filename = file.name title = Model().get_config()["upload"]["title"] or file.stem Model().update_specific_config("upload", "title", title) filesize = file.stat().st_size - self.logger.info(f'The {title} to be uploaded') + self.logger.info(f"The {title} to be uploaded") # upload video - self.logger.info('Start preuploading the video') - pre_upload_response = self.bili_uploader.preupload(filename=filename, filesize=filesize, cdn=cdn, probe_version=probe_version) - upos_uri = pre_upload_response['upos_uri'].split('//')[-1] - auth = pre_upload_response['auth'] - biz_id = pre_upload_response['biz_id'] - chunk_size = pre_upload_response['chunk_size'] - chunks = ceil(filesize/chunk_size) + self.logger.info("Start preuploading the video") + pre_upload_response = self.bili_uploader.preupload( + filename=filename, filesize=filesize, cdn=cdn, probe_version=probe_version + ) + upos_uri = pre_upload_response["upos_uri"].split("//")[-1] + auth = pre_upload_response["auth"] + biz_id = pre_upload_response["biz_id"] + chunk_size = pre_upload_response["chunk_size"] + chunks = ceil(filesize / chunk_size) - self.logger.info('Start uploading the video') - upload_video_id_response = self.bili_uploader.get_upload_video_id(upos_uri=upos_uri, auth=auth, upos_url=upos_url) - upload_id = upload_video_id_response['upload_id'] - key = upload_video_id_response['key'] + self.logger.info("Start uploading the video") + upload_video_id_response = self.bili_uploader.get_upload_video_id( + upos_uri=upos_uri, auth=auth, upos_url=upos_url + ) + upload_id = upload_video_id_response["upload_id"] + key = upload_video_id_response["key"] - bilibili_filename = re.search(r'/(.*)\.', key).group(1) + bilibili_filename = re.search(r"/(.*)\.", key).group(1) - self.logger.info(f'Uploading the video in {chunks} batches') - fileio = file.open(mode='rb') + self.logger.info(f"Uploading the video in {chunks} batches") + fileio = file.open(mode="rb") self.bili_uploader.upload_video_in_chunks( upos_uri=upos_uri, auth=auth, @@ -80,25 +86,34 @@ def upload_video(self, file, cdn=None): filesize=filesize, chunk_size=chunk_size, chunks=chunks, - upos_url=upos_url + upos_url=upos_url, ) fileio.close() # notify the all chunks have been uploaded - self.bili_uploader.finish_upload(upos_uri=upos_uri, auth=auth, filename=filename, - upload_id=upload_id, biz_id=biz_id, chunks=chunks, upos_url=upos_url) + self.bili_uploader.finish_upload( + upos_uri=upos_uri, + auth=auth, + filename=filename, + upload_id=upload_id, + biz_id=biz_id, + chunks=chunks, + upos_url=upos_url, + ) return bilibili_filename def publish_video(self, file, cdn=None): bilibili_filename = self.upload_video(file, cdn) # publish video - publish_video_response = self.bili_uploader.publish_video(bilibili_filename=bilibili_filename) - if publish_video_response['code'] == 0: - bvid = publish_video_response['data']['bvid'] - self.logger.info(f'upload success!\tbvid:{bvid}') + publish_video_response = self.bili_uploader.publish_video( + bilibili_filename=bilibili_filename + ) + if publish_video_response["code"] == 0: + bvid = publish_video_response["data"]["bvid"] + self.logger.info(f"upload success!\tbvid:{bvid}") return True else: - self.logger.error(publish_video_response['message']) + self.logger.error(publish_video_response["message"]) return False # reset the video title Model().reset_upload_config() @@ -107,25 +122,42 @@ def append_video_entry(self, video_path, bvid, cdn=None, video_name=None): bilibili_filename = self.upload_video(video_path, cdn) video_name = video_name if video_name else Path(video_path).name.strip(".mp4") video_data = self.bili_uploader.get_video_list_info(bvid) - response = self.bili_uploader.append_video(bilibili_filename, video_name, video_data).json() - if response['code'] == 0: - self.logger.info(f'append {video_name} to {bvid} success!') + response = self.bili_uploader.append_video( + bilibili_filename, video_name, video_data + ).json() + if response["code"] == 0: + self.logger.info(f"append {video_name} to {bvid} success!") return True else: - self.logger.error(response['message']) + self.logger.error(response["message"]) return False # reset the video title Model().reset_upload_config() - def upload_video_entry(self, video_path, yaml, copyright, tid, title, desc, tag, source, cover, dynamic, cdn=None): + def upload_video_entry( + self, + video_path, + yaml, + copyright, + tid, + title, + desc, + tag, + source, + cover, + dynamic, + cdn=None, + ): if yaml: # * is used to unpack the tuple upload_metadata = self.package_upload_metadata(*parse_yaml(yaml)) else: upload_metadata = self.package_upload_metadata( - copyright, tid, title, - desc, tag, source, cover, dynamic + copyright, tid, title, desc, tag, source, cover, dynamic + ) + if upload_metadata["cover"]: + upload_metadata["cover"] = self.bili_uploader.cover_up( + upload_metadata["cover"] ) - Model().update_multiple_config('upload', upload_metadata) + Model().update_multiple_config("upload", upload_metadata) return self.publish_video(video_path, cdn) - \ No newline at end of file diff --git a/bilitool/download/bili_download.py b/bilitool/download/bili_download.py index 3625622..b909c17 100644 --- a/bilitool/download/bili_download.py +++ b/bilitool/download/bili_download.py @@ -13,39 +13,49 @@ def __init__(self, logger) -> None: self.config = Model().get_config() self.headers = Model().get_headers_with_cookies_and_refer() - def get_cid(self,bvid): - url="https://api.bilibili.com/x/player/pagelist?bvid="+bvid + def get_cid(self, bvid): + url = "https://api.bilibili.com/x/player/pagelist?bvid=" + bvid response = requests.get(url, headers=self.headers) - return response.json()['data'] + return response.json()["data"] def get_bvid_video(self, bvid, cid, name_raw="video"): - url = "https://api.bilibili.com/x/player/playurl?cid="+str(cid)+"&bvid="+bvid+"&qn="+str(self.config["download"]["quality"]) - name = name_raw+'.mp4' + url = ( + "https://api.bilibili.com/x/player/playurl?cid=" + + str(cid) + + "&bvid=" + + bvid + + "&qn=" + + str(self.config["download"]["quality"]) + ) + name = name_raw + ".mp4" response = None response = requests.get(url, headers=self.headers) video_url = response.json()["data"]["durl"][0]["url"] self.download_video(video_url, name) def download_video(self, url, name): - response = requests.get( - url, headers=self.headers, stream=True) + response = requests.get(url, headers=self.headers, stream=True) if response.status_code == 200: - with open(name, 'wb') as file: - content_length = int(response.headers['Content-Length']) - progress_bar = tqdm(total=content_length, unit='B', unit_scale=True, desc=name) - for chunk in response.iter_content(chunk_size=self.config["download"]["chunksize"]): + with open(name, "wb") as file: + content_length = int(response.headers["Content-Length"]) + progress_bar = tqdm( + total=content_length, unit="B", unit_scale=True, desc=name + ) + for chunk in response.iter_content( + chunk_size=self.config["download"]["chunksize"] + ): file.write(chunk) progress_bar.update(len(chunk)) progress_bar.close() - self.logger.info(f'Download completed') + self.logger.info(f"Download completed") else: - self.logger.info(f'{name} Download failed') + self.logger.info(f"{name} Download failed") def download_danmaku(self, cid, name_raw="video"): if self.config["download"]["danmaku"]: - self.logger.info(f'Begin download danmaku') - dm_url = "https://comment.bilibili.com/"+cid+".xml" + self.logger.info(f"Begin download danmaku") + dm_url = "https://comment.bilibili.com/" + cid + ".xml" response = requests.get(dm_url, headers=self.headers) - with open(name_raw+'.xml', 'wb') as file: + with open(name_raw + ".xml", "wb") as file: file.write(response.content) - self.logger.info(f'Successfully downloaded danmaku') + self.logger.info(f"Successfully downloaded danmaku") diff --git a/bilitool/feed/__init__.py b/bilitool/feed/__init__.py index 0618058..083a5fd 100644 --- a/bilitool/feed/__init__.py +++ b/bilitool/feed/__init__.py @@ -1,17 +1,19 @@ # Copyright (c) 2025 bilitool + def VideoListInfo(): return { "bvid": str(), - "title": 'video title', - "state_desc": '', + "title": "video title", + "state_desc": "", # the status detail "state": 0, - "reject_reason": '', + "reject_reason": "", # the overview of status 0: pass review 1: reviewing 2: rejected 3: clash 4: the codec issue - "state_panel": 0 + "state_panel": 0, } + # https://github.com/SocialSisterYi/bilibili-API-collect/blob/e5fbfed42807605115c6a9b96447f6328ca263c5/docs/video/attribute_data.md?plain=1#L44 state_dict = { 1: "橙色通过", @@ -36,23 +38,23 @@ def VideoListInfo(): -30: "创建已提交", -40: "定时发布", -50: "仅UP主可见", - -100: "用户删除" + -100: "用户删除", } video_info_dict = { - 'title': '标题', - 'desc': '描述', - 'duration': '时长', - 'pubdate': '发布日期', - 'owner_name': '作者名称', - 'tname': '分区', - 'copyright': '版权', - 'width': '宽', - 'height': '高', - 'stat_view': '观看数', - 'stat_danmaku': '弹幕数', - 'stat_reply': '评论数', - 'stat_coin': '硬币数', - 'stat_share': '分享数', - 'stat_like': '点赞数' -} \ No newline at end of file + "title": "标题", + "desc": "描述", + "duration": "时长", + "pubdate": "发布日期", + "owner_name": "作者名称", + "tname": "分区", + "copyright": "版权", + "width": "宽", + "height": "高", + "stat_view": "观看数", + "stat_danmaku": "弹幕数", + "stat_reply": "评论数", + "stat_coin": "硬币数", + "stat_share": "分享数", + "stat_like": "点赞数", +} diff --git a/bilitool/feed/bili_live_list.py b/bilitool/feed/bili_live_list.py index bf13609..d7304c5 100644 --- a/bilitool/feed/bili_live_list.py +++ b/bilitool/feed/bili_live_list.py @@ -2,6 +2,7 @@ import requests + class BiliLiveList: def __init__(self, headers): self.headers = headers @@ -9,14 +10,15 @@ def __init__(self, headers): def get_live_info(self, room) -> dict: """Get the live info of the room""" - url = "https://api.live.bilibili.com/room/v1/Room/get_info?room_id={room}".format( - room=room + url = ( + "https://api.live.bilibili.com/room/v1/Room/get_info?room_id={room}".format( + room=room + ) ) response = requests.get(url=url, headers=self.headers) if response.status_code != 200: - raise Exception('HTTP ERROR') + raise Exception("HTTP ERROR") response_json = response.json().get("data") if response.json().get("code") != 0: raise Exception(response.json().get("message")) return response_json - diff --git a/bilitool/feed/bili_video_list.py b/bilitool/feed/bili_video_list.py index 5ec46eb..9ab9262 100644 --- a/bilitool/feed/bili_video_list.py +++ b/bilitool/feed/bili_video_list.py @@ -17,17 +17,19 @@ def save_video_list_info(archive: dict): Save the video info """ info = VideoListInfo() - info['bvid'] = archive.get("bvid") - info['title'] = archive.get("title") - info['state'] = archive.get("state") - info['state_desc'] = archive.get("state_desc") - info['reject_reason'] = archive.get('reject_reason') - info['state_panel'] = archive.get('state_panel') + info["bvid"] = archive.get("bvid") + info["title"] = archive.get("title") + info["state"] = archive.get("state") + info["state_desc"] = archive.get("state_desc") + info["reject_reason"] = archive.get("reject_reason") + info["state_panel"] = archive.get("state_panel") return info - def get_bili_video_list(self, size: int = 20, status_type: str = 'pubed,not_pubed,is_pubing'): + def get_bili_video_list( + self, size: int = 20, status_type: str = "pubed,not_pubed,is_pubing" + ): """Query the video list - + :param size: page size :param status_type: pubed,not_pubed,is_pubing """ @@ -39,37 +41,43 @@ def get_bili_video_list(self, size: int = 20, status_type: str = 'pubed,not_pube if resp.json().get("code") != 0: raise Exception(resp.json().get("message")) arc_items = list() - page_info = response_data.get('1') + page_info = response_data.get("1") if response_data.get("arc_audits") is not None: for item in response_data.get("arc_audits"): - archive = item['Archive'] - for i, v in enumerate(item['Videos']): - if v['reject_reason'] != '': - archive['reject_reason'] += "\nP{p_num}-{r}".format(p_num=i + 1, r=v['reject_reason']) + archive = item["Archive"] + for i, v in enumerate(item["Videos"]): + if v["reject_reason"] != "": + archive["reject_reason"] += "\nP{p_num}-{r}".format( + p_num=i + 1, r=v["reject_reason"] + ) arc_items.append(self.save_video_list_info(archive)) data: dict = { "page": page_info, - "status": response_data.get('class'), - "items": arc_items + "status": response_data.get("class"), + "items": arc_items, } return data - def print_video_list_info(self, size: int = 20, status_type: str = 'pubed,not_pubed,is_pubing'): + def print_video_list_info( + self, size: int = 20, status_type: str = "pubed,not_pubed,is_pubing" + ): video_data = self.get_bili_video_list(size, status_type) - for item in video_data['items']: + for item in video_data["items"]: info = f"{item['state_desc']} | {item['bvid']} | {item['title']}" extra_info = [] - if item['reject_reason']: + if item["reject_reason"]: extra_info.append(f"拒绝原因: {item['reject_reason']}") if extra_info: info += f" | {' | '.join(extra_info)}" print(info) - def get_video_dict_info(self, size: int = 20, status_type: str = 'pubed,not_pubed,is_pubing'): + def get_video_dict_info( + self, size: int = 20, status_type: str = "pubed,not_pubed,is_pubing" + ): video_data = self.get_bili_video_list(size, status_type) data = dict() - for item in video_data['items']: - data[item['title']] = item['bvid'] + for item in video_data["items"]: + data[item["title"]] = item["bvid"] return data def get_video_info(self, bvid: str) -> dict: @@ -77,43 +85,43 @@ def get_video_info(self, bvid: str) -> dict: url = f"https://api.bilibili.com/x/web-interface/view?bvid={bvid}" resp = requests.get(url=url, headers=self.headers) if resp.status_code != 200: - raise Exception('HTTP ERROR') + raise Exception("HTTP ERROR") return resp.json() @staticmethod def extract_video_info(response_data): - data = response_data.get('data', {}) - + data = response_data.get("data", {}) + video_info = { # video info - 'title': data.get('title'), - 'desc': data.get('desc'), - 'duration': data.get('duration'), - 'pubdate': data.get('pubdate'), - 'owner_name': data.get('owner', {}).get('name'), - 'tname': data.get('tname'), - 'copyright': data.get('copyright'), - 'width': data.get('dimension', {}).get('width'), - 'height': data.get('dimension', {}).get('height'), + "title": data.get("title"), + "desc": data.get("desc"), + "duration": data.get("duration"), + "pubdate": data.get("pubdate"), + "owner_name": data.get("owner", {}).get("name"), + "tname": data.get("tname"), + "copyright": data.get("copyright"), + "width": data.get("dimension", {}).get("width"), + "height": data.get("dimension", {}).get("height"), # video status - 'stat_view': data.get('stat', {}).get('view'), - 'stat_danmaku': data.get('stat', {}).get('danmaku'), - 'stat_reply': data.get('stat', {}).get('reply'), - 'stat_coin': data.get('stat', {}).get('coin'), - 'stat_share': data.get('stat', {}).get('share'), - 'stat_like': data.get('stat', {}).get('like') + "stat_view": data.get("stat", {}).get("view"), + "stat_danmaku": data.get("stat", {}).get("danmaku"), + "stat_reply": data.get("stat", {}).get("reply"), + "stat_coin": data.get("stat", {}).get("coin"), + "stat_share": data.get("stat", {}).get("share"), + "stat_like": data.get("stat", {}).get("like"), } - + return video_info - + @staticmethod def print_video_info(video_info): for key, value in video_info.items(): - if key == 'duration': - value = f"{value // 60}:{value % 60}" - elif key == 'pubdate': + if key == "duration": + value = f"{value // 60}:{value % 60}" + elif key == "pubdate": value = time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime(value)) - elif key == 'copyright': - value = '原创' if value == 1 else '转载' + elif key == "copyright": + value = "原创" if value == 1 else "转载" label = video_info_dict.get(key, key) print(f"{label}: {value}") diff --git a/bilitool/login/check_bili_login.py b/bilitool/login/check_bili_login.py index cd37243..c60ade0 100644 --- a/bilitool/login/check_bili_login.py +++ b/bilitool/login/check_bili_login.py @@ -4,23 +4,26 @@ import requests import json + class CheckBiliLogin(object): def __init__(self): self.config = Model().get_config() def check_bili_login(self): - url = 'https://api.bilibili.com/x/web-interface/nav' + url = "https://api.bilibili.com/x/web-interface/nav" with requests.Session() as session: session.headers = self.config["headers"] session.cookies = requests.utils.cookiejar_from_dict(self.config["cookies"]) response = session.get(url) if response.status_code == 200: response_data = json.loads(response.text) - if response_data['data']['isLogin'] == True: + if response_data["data"]["isLogin"] == True: self.obtain_bili_login_info(response_data) return True else: - print("There is currently no login account, some functions may not work") + print( + "There is currently no login account, some functions may not work" + ) # print(response.text) return False else: @@ -29,9 +32,9 @@ def check_bili_login(self): return False def obtain_bili_login_info(self, response_data): - current_level = response_data['data']['level_info']['current_level'] - uname = response_data['data']['uname'] - vip_status = response_data['data']['vipStatus'] + current_level = response_data["data"]["level_info"]["current_level"] + uname = response_data["data"]["uname"] + vip_status = response_data["data"]["vipStatus"] print(f"Current account: {uname}") print(f"Current level: {current_level}") diff --git a/bilitool/login/login_bili.py b/bilitool/login/login_bili.py index 87d3d00..d959ac0 100644 --- a/bilitool/login/login_bili.py +++ b/bilitool/login/login_bili.py @@ -15,12 +15,12 @@ def __init__(self): self.APP_SEC = "59b43e04ad6965f34319062b478f83dd" def signature(self, params): - params['appkey'] = self.APP_KEY + params["appkey"] = self.APP_KEY keys = sorted(params.keys()) - query = '&'.join(f"{k}={params[k]}" for k in keys) + query = "&".join(f"{k}={params[k]}" for k in keys) query += self.APP_SEC - md5_hash = hashlib.md5(query.encode('utf-8')).hexdigest() - params['sign'] = md5_hash + md5_hash = hashlib.md5(query.encode("utf-8")).hexdigest() + params["sign"] = md5_hash @staticmethod def map_to_string(params): @@ -29,7 +29,7 @@ def map_to_string(params): def execute_curl_command(self, api, data): data_string = LoginBili.map_to_string(data) headers = "Content-Type: application/x-www-form-urlencoded" - curl_command = f"curl -X POST -H \"{headers}\" -d \"{data_string}\" {api}" + curl_command = f'curl -X POST -H "{headers}" -d "{data_string}" {api}' result = subprocess.run( curl_command, shell=True, capture_output=True, text=True, encoding="utf-8" ) @@ -39,60 +39,70 @@ def execute_curl_command(self, api, data): def get_tv_qrcode_url_and_auth_code(self): api = "https://passport.bilibili.com/x/passport-tv-login/qrcode/auth_code" - data = { - "local_id": "0", - "ts": str(int(time.time())) - } + data = {"local_id": "0", "ts": str(int(time.time()))} self.signature(data) body = self.execute_curl_command(api, data) - if body['code'] == 0: - qrcode_url = body['data']['url'] - auth_code = body['data']['auth_code'] + if body["code"] == 0: + qrcode_url = body["data"]["url"] + auth_code = body["data"]["auth_code"] return qrcode_url, auth_code else: raise Exception("get_tv_qrcode_url_and_auth_code error") def verify_login(self, auth_code, export): api = "https://passport.bilibili.com/x/passport-tv-login/qrcode/poll" - data = { - "auth_code": auth_code, - "local_id": "0", - "ts": str(int(time.time())) - } + data = {"auth_code": auth_code, "local_id": "0", "ts": str(int(time.time()))} self.signature(data) while True: body = self.execute_curl_command(api, data) - if body['code'] == 0: + if body["code"] == 0: filename = "cookie.json" if export: - with open(filename, 'w', encoding='utf-8') as f: + with open(filename, "w", encoding="utf-8") as f: json.dump(body, f, ensure_ascii=False, indent=4) print(f"cookie has been saved to {filename}") - access_key_value = body['data']['access_token'] - sessdata_value = body['data']['cookie_info']['cookies'][0]['value'] - bili_jct_value = body['data']['cookie_info']['cookies'][1]['value'] - dede_user_id_value = body['data']['cookie_info']['cookies'][2]['value'] - dede_user_id_ckmd5_value = body['data']['cookie_info']['cookies'][3]['value'] - sid_value = body['data']['cookie_info']['cookies'][4]['value'] - Model().save_cookies_info(access_key_value, sessdata_value, bili_jct_value, dede_user_id_value, dede_user_id_ckmd5_value, sid_value) + access_key_value = body["data"]["access_token"] + sessdata_value = body["data"]["cookie_info"]["cookies"][0]["value"] + bili_jct_value = body["data"]["cookie_info"]["cookies"][1]["value"] + dede_user_id_value = body["data"]["cookie_info"]["cookies"][2]["value"] + dede_user_id_ckmd5_value = body["data"]["cookie_info"]["cookies"][3][ + "value" + ] + sid_value = body["data"]["cookie_info"]["cookies"][4]["value"] + Model().save_cookies_info( + access_key_value, + sessdata_value, + bili_jct_value, + dede_user_id_value, + dede_user_id_ckmd5_value, + sid_value, + ) print("Login success!") break else: time.sleep(3) def get_cookie_file_login(self, filename): - with open(filename, 'r', encoding='utf-8') as f: + with open(filename, "r", encoding="utf-8") as f: body = json.load(f) - access_key_value = body['data']['access_token'] - sessdata_value = body['data']['cookie_info']['cookies'][0]['value'] - bili_jct_value = body['data']['cookie_info']['cookies'][1]['value'] - dede_user_id_value = body['data']['cookie_info']['cookies'][2]['value'] - dede_user_id_ckmd5_value = body['data']['cookie_info']['cookies'][3]['value'] - sid_value = body['data']['cookie_info']['cookies'][4]['value'] - Model().save_cookies_info(access_key_value, sessdata_value, bili_jct_value, dede_user_id_value, dede_user_id_ckmd5_value, sid_value) + access_key_value = body["data"]["access_token"] + sessdata_value = body["data"]["cookie_info"]["cookies"][0]["value"] + bili_jct_value = body["data"]["cookie_info"]["cookies"][1]["value"] + dede_user_id_value = body["data"]["cookie_info"]["cookies"][2]["value"] + dede_user_id_ckmd5_value = body["data"]["cookie_info"]["cookies"][3][ + "value" + ] + sid_value = body["data"]["cookie_info"]["cookies"][4]["value"] + Model().save_cookies_info( + access_key_value, + sessdata_value, + bili_jct_value, + dede_user_id_value, + dede_user_id_ckmd5_value, + sid_value, + ) if CheckBiliLogin().check_bili_login(): print("Login success!", flush=True) else: print("Login failed, please check the cookie file again", flush=True) - \ No newline at end of file diff --git a/bilitool/login/logout_bili.py b/bilitool/login/logout_bili.py index 5621880..2d04e6c 100644 --- a/bilitool/login/logout_bili.py +++ b/bilitool/login/logout_bili.py @@ -12,28 +12,26 @@ def __init__(self): self.config = Model().get_config() def logout_bili(self): - host = 'passport.bilibili.com' - path = '/login/exit/v2' + host = "passport.bilibili.com" + path = "/login/exit/v2" headers = { - 'Cookie': f'DedeUserID={self.config["cookies"]["DedeUserID"]}; bili_jct={self.config["cookies"]["bili_jct"]}; SESSDATA={self.config["cookies"]["SESSDATA"]}', - 'Content-Type': 'application/x-www-form-urlencoded' + "Cookie": f'DedeUserID={self.config["cookies"]["DedeUserID"]}; bili_jct={self.config["cookies"]["bili_jct"]}; SESSDATA={self.config["cookies"]["SESSDATA"]}', + "Content-Type": "application/x-www-form-urlencoded", } - data = { - 'biliCSRF': self.config["cookies"]["bili_jct"] - } + data = {"biliCSRF": self.config["cookies"]["bili_jct"]} encoded_data = urllib.parse.urlencode(data) connection = http.client.HTTPSConnection(host) - connection.request('POST', path, body=encoded_data, headers=headers) + connection.request("POST", path, body=encoded_data, headers=headers) response = connection.getresponse() - response_json = json.loads(response.read().decode('utf-8')) - if response_json['code'] == 0: + response_json = json.loads(response.read().decode("utf-8")) + if response_json["code"] == 0: print("Logout successfully, the cookie has expired") Model().reset_cookies() else: print("Logout failed, check the info:") print(response_json) - connection.close() \ No newline at end of file + connection.close() diff --git a/bilitool/model/model.py b/bilitool/model/model.py index 6ff17ba..905ec6e 100644 --- a/bilitool/model/model.py +++ b/bilitool/model/model.py @@ -9,15 +9,20 @@ def decorator(func): def wrapper(self, *args, **kwargs): headers = func(self, *args, **kwargs) config_info = self.get_config() - cookies = config_info['cookies'] - cookie_string = "; ".join([f"{key}={value}" for key, value in cookies.items() if value]) - headers['Cookie'] = cookie_string + cookies = config_info["cookies"] + cookie_string = "; ".join( + [f"{key}={value}" for key, value in cookies.items() if value] + ) + headers["Cookie"] = cookie_string if referer: - headers['Referer'] = referer + headers["Referer"] = referer return headers + return wrapper + return decorator + class Model: def __init__(self, path=None) -> None: if path is None: @@ -34,7 +39,7 @@ def __init__(self, path=None) -> None: "DedeUserID": "", "DedeUserID__ckMd5": "", "access_key": "", - "sid": "" + "sid": "", }, "upload": { "copyright": 2, @@ -44,14 +49,14 @@ def __init__(self, path=None) -> None: "tag": "bilitool", "source": "\u6765\u6e90\u4e8e\u4e92\u8054\u7f51", "cover": "", - "dynamic": "" + "dynamic": "", }, "download": { "danmaku": 1, "quality": 64, "chunksize": 1024, - "multiple": False - } + "multiple": False, + }, } def get_default_config(self): @@ -62,20 +67,22 @@ def reset_config(self): @add_headers_info() def get_headers_with_cookies(self): - return self.get_config()['headers'] + return self.get_config()["headers"] - @add_headers_info(referer='https://www.bilibili.com/') + @add_headers_info(referer="https://www.bilibili.com/") def get_headers_with_cookies_and_refer(self): - return self.get_config()['headers'] + return self.get_config()["headers"] - def save_cookies_info(self, access_key, sessdata, bili_jct, dede_user_id, dede_user_id_ckmd5, sid): + def save_cookies_info( + self, access_key, sessdata, bili_jct, dede_user_id, dede_user_id_ckmd5, sid + ): config_info = self.get_config() - config_info['cookies']['access_key'] = access_key - config_info['cookies']['SESSDATA'] = sessdata - config_info['cookies']['bili_jct'] = bili_jct - config_info['cookies']['DedeUserID'] = dede_user_id - config_info['cookies']['DedeUserID__ckMd5'] = dede_user_id_ckmd5 - config_info['cookies']['sid'] = sid + config_info["cookies"]["access_key"] = access_key + config_info["cookies"]["SESSDATA"] = sessdata + config_info["cookies"]["bili_jct"] = bili_jct + config_info["cookies"]["DedeUserID"] = dede_user_id + config_info["cookies"]["DedeUserID__ckMd5"] = dede_user_id_ckmd5 + config_info["cookies"]["sid"] = sid self.write(config_info) def update_specific_config(self, action, key, value): @@ -91,24 +98,24 @@ def update_multiple_config(self, action, updates: dict): def reset_upload_config(self): config_info = self.get_config() - config_info['upload']['copyright'] = 2 - config_info['upload']['title'] = "" - config_info['upload']['desc'] = "" - config_info['upload']['tid'] = 138 - config_info['upload']['tag'] = "bilitool" - config_info['upload']['source'] = "\u6765\u6e90\u4e8e\u4e92\u8054\u7f51" - config_info['upload']['cover'] = "" - config_info['upload']['dynamic'] = "" + config_info["upload"]["copyright"] = 2 + config_info["upload"]["title"] = "" + config_info["upload"]["desc"] = "" + config_info["upload"]["tid"] = 138 + config_info["upload"]["tag"] = "bilitool" + config_info["upload"]["source"] = "\u6765\u6e90\u4e8e\u4e92\u8054\u7f51" + config_info["upload"]["cover"] = "" + config_info["upload"]["dynamic"] = "" self.write(config_info) def reset_cookies(self): config_info = self.get_config() - config_info['cookies']['access_key'] = "" - config_info['cookies']['SESSDATA'] = "" - config_info['cookies']['bili_jct'] = "" - config_info['cookies']['DedeUserID'] = "" - config_info['cookies']['DedeUserID__ckMd5'] = "" - config_info['cookies']['sid'] = "" + config_info["cookies"]["access_key"] = "" + config_info["cookies"]["SESSDATA"] = "" + config_info["cookies"]["bili_jct"] = "" + config_info["cookies"]["DedeUserID"] = "" + config_info["cookies"]["DedeUserID__ckMd5"] = "" + config_info["cookies"]["sid"] = "" self.write(config_info) def get_config(self): diff --git a/bilitool/upload/bili_upload.py b/bilitool/upload/bili_upload.py index b99bc24..55dcaa7 100644 --- a/bilitool/upload/bili_upload.py +++ b/bilitool/upload/bili_upload.py @@ -13,6 +13,8 @@ from ..model.model import Model import hashlib import time +import base64 + class BiliUploader(object): def __init__(self, logger): @@ -20,37 +22,85 @@ def __init__(self, logger): self.config = Model().get_config() self.session = requests.Session() self.session.headers = self.config["headers"] - self.session.cookies = requests.utils.cookiejar_from_dict(self.config["cookies"]) + self.session.cookies = requests.utils.cookiejar_from_dict( + self.config["cookies"] + ) self.headers = Model().get_headers_with_cookies_and_refer() + def cover_up(self, img: str): + """Upload the cover image + Parameters + ---------- + - img: img path or stream + Returns + ------- + - url: str + the url of the cover image in bili server + """ + from PIL import Image + from io import BytesIO + + with Image.open(img) as im: + # you should keep the image ratio 16:10 + xsize, ysize = im.size + if xsize / ysize > 1.6: + delta = xsize - ysize * 1.6 + region = im.crop((delta / 2, 0, xsize - delta / 2, ysize)) + else: + delta = ysize - xsize * 10 / 16 + region = im.crop((0, delta / 2, xsize, ysize - delta / 2)) + buffered = BytesIO() + region.save(buffered, format=im.format) + r = self.session.post( + url="https://member.bilibili.com/x/vu/web/cover/up", + data={ + "cover": b"data:image/jpeg;base64," + + (base64.b64encode(buffered.getvalue())), + "csrf": self.config["cookies"]["bili_jct"], + }, + timeout=30, + ) + buffered.close() + res = r.json() + if res.get("data") is None: + raise Exception(res) + self.logger.info(f"the cover image has been uploaded as {res['data']['url']}") + return res["data"]["url"] + def probe(self): self.logger.info("begin to probe the best cdn line") - ret = requests.get('https://member.bilibili.com/preupload?r=probe', headers=self.headers, timeout=5).json() + ret = requests.get( + "https://member.bilibili.com/preupload?r=probe", + headers=self.headers, + timeout=5, + ).json() data, auto_os = None, None min_cost = 0 - if ret['probe'].get('get'): - method = 'get' + if ret["probe"].get("get"): + method = "get" else: - method = 'post' + method = "post" data = bytes(int(1024 * 0.1 * 1024)) - for line in ret['lines']: + for line in ret["lines"]: start = time.perf_counter() - test = requests.request(method, f"https:{line['probe_url']}", data=data, timeout=30) + test = requests.request( + method, f"https:{line['probe_url']}", data=data, timeout=30 + ) cost = time.perf_counter() - start - print(line['query'], cost) + print(line["query"], cost) if test.status_code != 200: return if not min_cost or min_cost > cost: auto_os = line min_cost = cost - auto_os['cost'] = min_cost + auto_os["cost"] = min_cost self.logger.info(f"the best cdn line is:{auto_os}") - upos_url = auto_os['probe_url'].rstrip('OK') + upos_url = auto_os["probe_url"].rstrip("OK") self.logger.info(f"the upos_url is:{upos_url}") - query_params = dict(param.split('=') for param in auto_os['query'].split('&')) - cdn = query_params.get('upcdn') + query_params = dict(param.split("=") for param in auto_os["query"].split("&")) + cdn = query_params.get("upcdn") self.logger.info(f"the cdn is:{cdn}") - probe_version = query_params.get('probe_version') + probe_version = query_params.get("probe_version") self.logger.info(f"the probe_version is:{probe_version}") return upos_url, cdn, probe_version @@ -76,26 +126,24 @@ def preupload(self, *, filename, filesize, cdn, probe_version): So I ask a question on the V2EX: https://v2ex.com/t/1103152 Finally, the netizens reckon that may be the translation style of bilibili. """ - url = 'https://member.bilibili.com/preupload' + url = "https://member.bilibili.com/preupload" params = { - 'name': filename, - 'size': filesize, + "name": filename, + "size": filesize, # The parameters below are fixed - 'r': 'upos', - 'profile': 'ugcupos/bup', - 'ssl': 0, - 'version': '2.8.9', - 'build': '2080900', - 'upcdn': cdn, - 'probe_version': probe_version + "r": "upos", + "profile": "ugcupos/bup", + "ssl": 0, + "version": "2.8.9", + "build": "2080900", + "upcdn": cdn, + "probe_version": probe_version, } res_json = self.session.get( - url, - params=params, - headers={'TE': 'Trailers'} + url, params=params, headers={"TE": "Trailers"} ).json() - assert res_json['OK'] == 1 - self.logger.info('Completed preupload phase') + assert res_json["OK"] == 1 + self.logger.info("Completed preupload phase") # print(res_json) return res_json @@ -113,14 +161,25 @@ def get_upload_video_id(self, *, upos_uri, auth, upos_url): - upload_id: str the id of the video to be uploaded """ - url = f'https:{upos_url}{upos_uri}?uploads&output=json' - res_json = self.session.post(url, headers={'X-Upos-Auth': auth}).json() - assert res_json['OK'] == 1 - self.logger.info('Completed upload_id obtaining phase') + url = f"https:{upos_url}{upos_uri}?uploads&output=json" + res_json = self.session.post(url, headers={"X-Upos-Auth": auth}).json() + assert res_json["OK"] == 1 + self.logger.info("Completed upload_id obtaining phase") # print(res_json) return res_json - def upload_video_in_chunks(self, *, upos_uri, auth, upload_id, fileio, filesize, chunk_size, chunks, upos_url): + def upload_video_in_chunks( + self, + *, + upos_uri, + auth, + upload_id, + fileio, + filesize, + chunk_size, + chunks, + upos_url, + ): """Upload the video in chunks. Parameters @@ -140,35 +199,40 @@ def upload_video_in_chunks(self, *, upos_uri, auth, upload_id, fileio, filesize, - chunks: int the number of chunks to be uploaded """ - url = f'https:{upos_url}{upos_uri}' + url = f"https:{upos_url}{upos_uri}" params = { - 'partNumber': None, # start from 1 - 'uploadId': upload_id, - 'chunk': None, # start from 0 - 'chunks': chunks, - 'size': None, # current batch size - 'start': None, - 'end': None, - 'total': filesize, + "partNumber": None, # start from 1 + "uploadId": upload_id, + "chunk": None, # start from 0 + "chunks": chunks, + "size": None, # current batch size + "start": None, + "end": None, + "total": filesize, } # Single thread upload - with tqdm(total=filesize, desc="Uploading video", unit="B", unit_scale=True) as pbar: + with tqdm( + total=filesize, desc="Uploading video", unit="B", unit_scale=True + ) as pbar: for chunknum in range(chunks): start = fileio.tell() batchbytes = fileio.read(chunk_size) - params['partNumber'] = chunknum + 1 - params['chunk'] = chunknum - params['size'] = len(batchbytes) - params['start'] = start - params['end'] = fileio.tell() - res = self.session.put(url, params=params, data=batchbytes, headers={ - 'X-Upos-Auth': auth}) + params["partNumber"] = chunknum + 1 + params["chunk"] = chunknum + params["size"] = len(batchbytes) + params["start"] = start + params["end"] = fileio.tell() + res = self.session.put( + url, params=params, data=batchbytes, headers={"X-Upos-Auth": auth} + ) assert res.status_code == 200 - self.logger.debug(f'Completed chunk{chunknum+1} uploading') + self.logger.debug(f"Completed chunk{chunknum+1} uploading") pbar.update(len(batchbytes)) # print(res) - def finish_upload(self, *, upos_uri, auth, filename, upload_id, biz_id, chunks, upos_url): + def finish_upload( + self, *, upos_uri, auth, filename, upload_id, biz_id, chunks, upos_url + ): """Notify the all chunks have been uploaded. Parameters @@ -186,19 +250,19 @@ def finish_upload(self, *, upos_uri, auth, filename, upload_id, biz_id, chunks, - chunks: int the number of chunks to be uploaded """ - url = f'https:{upos_url}{upos_uri}' + url = f"https:{upos_url}{upos_uri}" params = { - 'output': 'json', - 'name': filename, - 'profile' : 'ugcupos/bup', - 'uploadId': upload_id, - 'biz_id': biz_id + "output": "json", + "name": filename, + "profile": "ugcupos/bup", + "uploadId": upload_id, + "biz_id": biz_id, } - data = {"parts": [{"partNumber": i, "eTag": "etag"} - for i in range(chunks, 1)]} - res_json = self.session.post(url, params=params, json=data, - headers={'X-Upos-Auth': auth}).json() - assert res_json['OK'] == 1 + data = {"parts": [{"partNumber": i, "eTag": "etag"} for i in range(chunks, 1)]} + res_json = self.session.post( + url, params=params, json=data, headers={"X-Upos-Auth": auth} + ).json() + assert res_json["OK"] == 1 # print(res_json) # API docs: https://github.com/SocialSisterYi/bilibili-API-collect/blob/master/docs/creativecenter/upload.md @@ -206,57 +270,67 @@ def publish_video(self, bilibili_filename): """publish the uploaded video""" config = Model().get_config() url = f'https://member.bilibili.com/x/vu/client/add?access_key={config["cookies"]["access_key"]}' - data = {'copyright': config["upload"]["copyright"], - 'videos': [{'filename': bilibili_filename, - 'title': config["upload"]["title"], - 'desc': config["upload"]["desc"]}], - 'source': config["upload"]["source"], - 'tid': config["upload"]["tid"], - 'title': config["upload"]["title"], - 'cover': config["upload"]["cover"], - 'tag': config["upload"]["tag"], - 'desc_format_id': 0, - 'desc': config["upload"]["desc"], - 'dynamic': config["upload"]["dynamic"], - 'subtitle': {'open': 0, 'lan': ''}} + data = { + "copyright": config["upload"]["copyright"], + "videos": [ + { + "filename": bilibili_filename, + "title": config["upload"]["title"], + "desc": config["upload"]["desc"], + } + ], + "source": config["upload"]["source"], + "tid": config["upload"]["tid"], + "title": config["upload"]["title"], + "cover": config["upload"]["cover"], + "tag": config["upload"]["tag"], + "desc_format_id": 0, + "desc": config["upload"]["desc"], + "dynamic": config["upload"]["dynamic"], + "subtitle": {"open": 0, "lan": ""}, + } if config["upload"]["copyright"] != 2: - del data['source'] - res_json = self.session.post(url, json=data, headers={'TE': 'Trailers'}).json() + del data["source"] + res_json = self.session.post(url, json=data, headers={"TE": "Trailers"}).json() # print(res_json) return res_json def get_updated_video_info(self, bvid: str): url = f"http://member.bilibili.com/x/client/archive/view" params = { - "access_key": Model().get_config()['cookies']['access_key'], - "bvid": bvid + "access_key": Model().get_config()["cookies"]["access_key"], + "bvid": bvid, } resp = requests.get(url=url, headers=self.headers, params=params) return resp.json()["data"] - + def get_video_list_info(self, bvid: str): raw_data = self.get_updated_video_info(bvid) # print(raw_data) videos = [] - for video in raw_data['videos']: - videos.append({ - 'filename': video['filename'], - 'title': video['title'], - 'desc': video['desc'] - }) + for video in raw_data["videos"]: + videos.append( + { + "filename": video["filename"], + "title": video["title"], + "desc": video["desc"], + } + ) - data = {'bvid': bvid, - 'build': 1054, - 'copyright': raw_data["archive"]["copyright"], - 'videos': videos, - 'source': raw_data["archive"]["source"], - 'tid': raw_data["archive"]["tid"], - 'title': raw_data["archive"]["title"], - 'cover': raw_data["archive"]["cover"], - 'tag': raw_data["archive"]["tag"], - 'no_reprint': raw_data["archive"]["no_reprint"], - 'open_elec': raw_data["archive_elec"]["state"], - 'desc': raw_data["archive"]["desc"]} + data = { + "bvid": bvid, + "build": 1054, + "copyright": raw_data["archive"]["copyright"], + "videos": videos, + "source": raw_data["archive"]["source"], + "tid": raw_data["archive"]["tid"], + "title": raw_data["archive"]["title"], + "cover": raw_data["archive"]["cover"], + "tag": raw_data["archive"]["tag"], + "no_reprint": raw_data["archive"]["no_reprint"], + "open_elec": raw_data["archive_elec"]["state"], + "desc": raw_data["archive"]["desc"], + } return data @staticmethod @@ -284,31 +358,29 @@ def append_video(self, bilibili_filename, video_name, data): """append the uploaded video""" # Parse JSON string to dict if data is a string video_to_be_appended = { - 'filename': bilibili_filename, - 'title': video_name, - 'desc': "", + "filename": bilibili_filename, + "title": video_name, + "desc": "", } - data['videos'].append(video_to_be_appended) + data["videos"].append(video_to_be_appended) headers = { - 'Connection': 'keep-alive', - 'Content-Type': 'application/json', - 'User-Agent': '', + "Connection": "keep-alive", + "Content-Type": "application/json", + "User-Agent": "", } params = { - 'access_key': Model().get_config()['cookies']['access_key'], + "access_key": Model().get_config()["cookies"]["access_key"], } - APPSECRET = 'af125a0d5279fd576c1b4418a3e8276d' - params['sign'] = BiliUploader.sign_dict(params, APPSECRET) + APPSECRET = "af125a0d5279fd576c1b4418a3e8276d" + params["sign"] = BiliUploader.sign_dict(params, APPSECRET) res_json = requests.post( url="http://member.bilibili.com/x/vu/client/edit", params=params, headers=headers, verify=False, - cookies={ - 'sid': Model().get_config()['cookies']['sid'] - }, + cookies={"sid": Model().get_config()["cookies"]["sid"]}, json=data, - ) + ) print(res_json.json()) - return res_json \ No newline at end of file + return res_json diff --git a/bilitool/utils/check_format.py b/bilitool/utils/check_format.py index bc2b12c..4efadf8 100644 --- a/bilitool/utils/check_format.py +++ b/bilitool/utils/check_format.py @@ -1,5 +1,6 @@ # Copyright (c) 2025 bilitool + class CheckFormat(object): def __init__(self): self.XOR_CODE = 23442827791579 @@ -18,14 +19,14 @@ def __init__(self): def is_bvid(bvid: str) -> bool: if len(bvid) != 12: return False - if bvid[0:2] != 'BV': + if bvid[0:2] != "BV": return False return True @staticmethod def is_chinese(word: str) -> bool: for ch in word: - if '\u4e00' <= ch <= '\u9fff': + if "\u4e00" <= ch <= "\u9fff": return True return False @@ -48,7 +49,7 @@ def bv2av(self, bvid: str) -> int: idx = self.ALPHABET.index(bvid[self.DECODE_MAP[i]]) tmp = tmp * self.BASE + idx return (tmp & self.MASK_CODE) ^ self.XOR_CODE - + def convert_bv_and_av(self, vid: str): if self.is_bvid(str(vid)): print("The avid of the video is: ", self.bv2av(str(vid))) diff --git a/bilitool/utils/get_ip_info.py b/bilitool/utils/get_ip_info.py index e19b772..4249a21 100644 --- a/bilitool/utils/get_ip_info.py +++ b/bilitool/utils/get_ip_info.py @@ -5,41 +5,43 @@ import json import inspect + def suppress_print_in_unittest(func): def wrapper(*args, **kwargs): # Check if the caller is a unittest for frame_info in inspect.stack(): - if 'unittest' in frame_info.filename: + if "unittest" in frame_info.filename: # If called from unittest, suppress print return func(*args, **kwargs) - + result = func(*args, **kwargs) if result: addr, isp, location, position = result print(f"IP: {addr}, ISP: {isp}, Location: {location}, Position: {position}") return result + return wrapper class IPInfo: @staticmethod def get_ip_address(ip=None): - url = 'https://api.live.bilibili.com/ip_service/v1/ip_service/get_ip_addr' + url = "https://api.live.bilibili.com/ip_service/v1/ip_service/get_ip_addr" if ip: - params = urllib.parse.urlencode({'ip': ip}) + params = urllib.parse.urlencode({"ip": ip}) full_url = f"{url}?{params}" else: full_url = url parsed_url = urllib.parse.urlparse(full_url) host = parsed_url.netloc - path = parsed_url.path + ('?' + parsed_url.query if parsed_url.query else '') + path = parsed_url.path + ("?" + parsed_url.query if parsed_url.query else "") connection = http.client.HTTPSConnection(host) connection.request("GET", path) response = connection.getresponse() - data = json.loads(response.read().decode('utf-8')) + data = json.loads(response.read().decode("utf-8")) connection.close() return IPInfo.print_ip_info(data) @@ -47,11 +49,15 @@ def get_ip_address(ip=None): @staticmethod @suppress_print_in_unittest def print_ip_info(ip_info): - if ip_info['code'] != 0: + if ip_info["code"] != 0: return None else: - addr = ip_info['data']['addr'] - isp = ip_info['data']['isp'] - location = ip_info['data']['country'] + ip_info['data']['province'] + ip_info['data']['city'] - position = ip_info['data']['latitude'] + ',' + ip_info['data']['longitude'] - return addr, isp, location, position \ No newline at end of file + addr = ip_info["data"]["addr"] + isp = ip_info["data"]["isp"] + location = ( + ip_info["data"]["country"] + + ip_info["data"]["province"] + + ip_info["data"]["city"] + ) + position = ip_info["data"]["latitude"] + "," + ip_info["data"]["longitude"] + return addr, isp, location, position diff --git a/bilitool/utils/parse_cookies.py b/bilitool/utils/parse_cookies.py index 0d84e83..ea47dfb 100644 --- a/bilitool/utils/parse_cookies.py +++ b/bilitool/utils/parse_cookies.py @@ -4,30 +4,32 @@ import os import sys + def parse_cookies(cookies_path): try: - with open(cookies_path, 'r') as file: + with open(cookies_path, "r") as file: data = json.load(file) except FileNotFoundError: return "Error: Cookies file not found." except json.JSONDecodeError: return "Error: Failed to decode JSON from cookies file." - cookies = data.get('data', {}).get('cookie_info', {}).get('cookies', []) + cookies = data.get("data", {}).get("cookie_info", {}).get("cookies", []) sessdata_value = None bili_jct_value = None for cookie in cookies: - if cookie['name'] == 'SESSDATA': - sessdata_value = cookie['value'] - elif cookie['name'] == 'bili_jct': - bili_jct_value = cookie['value'] + if cookie["name"] == "SESSDATA": + sessdata_value = cookie["value"] + elif cookie["name"] == "bili_jct": + bili_jct_value = cookie["value"] if not sessdata_value or not bili_jct_value: return "Error: Required cookies not found." return sessdata_value, bili_jct_value + if __name__ == "__main__": - sessdata, bili_jct = parse_cookies('') \ No newline at end of file + sessdata, bili_jct = parse_cookies("") diff --git a/bilitool/utils/parse_yaml.py b/bilitool/utils/parse_yaml.py index 46f129a..a6991f4 100644 --- a/bilitool/utils/parse_yaml.py +++ b/bilitool/utils/parse_yaml.py @@ -2,21 +2,23 @@ import yaml + def parse_yaml(yaml_path): - with open(yaml_path, 'r', encoding='utf-8') as file: + with open(yaml_path, "r", encoding="utf-8") as file: data = yaml.safe_load(file) - + # Assuming there's only one streamer entry - copyright = data.get('copyright') - tid = data.get('tid') - title = data.get('title') - desc = data.get('desc') - tag = data.get('tag') - source = data.get('source') - cover = data.get('cover') - dynamic = data.get('dynamic') + copyright = data.get("copyright") + tid = data.get("tid") + title = data.get("title") + desc = data.get("desc") + tag = data.get("tag") + source = data.get("source") + cover = data.get("cover") + dynamic = data.get("dynamic") return copyright, tid, title, desc, tag, source, cover, dynamic -if __name__ == '__main__': - res = parse_yaml('') - print(res) \ No newline at end of file + +if __name__ == "__main__": + res = parse_yaml("") + print(res) diff --git a/tests/test_feed.py b/tests/test_feed.py index defd394..61d21cf 100644 --- a/tests/test_feed.py +++ b/tests/test_feed.py @@ -4,6 +4,7 @@ from bilitool.feed.bili_video_list import BiliVideoList from bilitool.feed.bili_live_list import BiliLiveList + class TestBiliList(unittest.TestCase): def setUp(self): self.headers = { @@ -12,12 +13,12 @@ def setUp(self): def test_get_bili_video_list(self): bili = BiliVideoList() - bili.print_video_list_info(bili.get_bili_video_list(50, 'not_pubed')) + bili.print_video_list_info(bili.get_bili_video_list(50, "not_pubed")) def test_print_video_info_via_bvid(self): bili = BiliVideoList() - bili.print_video_info_via_bvid('BV1pCr6YcEgD') + bili.print_video_info_via_bvid("BV1pCr6YcEgD") def test_get_live_info(self): bili = BiliLiveList(self.headers) - print(bili.get_live_info(25538755)) \ No newline at end of file + print(bili.get_live_info(25538755)) diff --git a/tests/test_upload.py b/tests/test_upload.py index 55c8abd..2c8dddf 100644 --- a/tests/test_upload.py +++ b/tests/test_upload.py @@ -4,8 +4,9 @@ import logging from bilitool.upload.bili_upload import BiliUploader + class TestBiliUploader(unittest.TestCase): def test_get_updated_video_info(self): - logger = logging.getLogger('bilitool') + logger = logging.getLogger("bilitool") bili = BiliUploader(logger) - print(bili.get_updated_video_info('BVXXXXXXXXX')) + print(bili.get_updated_video_info("BVXXXXXXXXX")) diff --git a/tests/test_utils.py b/tests/test_utils.py index 6f99571..917398b 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -7,15 +7,22 @@ class TestIPInfo(unittest.TestCase): def test_get_ip_address(self): - self.assertEqual(IPInfo.get_ip_address('12.12.12.12'), - ('12.12.12.12', 'att.com', '美国阿拉斯加州安克雷奇', '61.108841,-149.373145')) + self.assertEqual( + IPInfo.get_ip_address("12.12.12.12"), + ( + "12.12.12.12", + "att.com", + "美国阿拉斯加州安克雷奇", + "61.108841,-149.373145", + ), + ) class TestCheckFormat(unittest.TestCase): def test_av2bv(self): check_format = CheckFormat() - self.assertEqual(check_format.av2bv(2), 'BV1xx411c7mD') + self.assertEqual(check_format.av2bv(2), "BV1xx411c7mD") def test_bv2av(self): check_format = CheckFormat() - self.assertEqual(check_format.bv2av('BV1y7411Q7Eq'), 99999999) + self.assertEqual(check_format.bv2av("BV1y7411Q7Eq"), 99999999)