rss解析磁力链,利用迅雷或者aria2下载
从RSS源中获取动画集信息、通过SSH将URL转换为磁力链接进行解析(linux),然后使用迅雷应用程序开始下载的过程。在执行过程中,用户会被提示输入一些参数。这是Web抓取、SSH自动化和Windows GUI自动化的组合应用
import requests
import xml.etree.ElementTree as ET
import re
import paramiko
import datetime
import time
from pywinauto.application import Application
import time
import os
import win32gui
import win32con
def ssh_execute(hostname, port, username, password, command):
# 创建SSH客户端
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 连接到远程服务器
client.connect(hostname, port, username, password)
# 获取一个传输通道
channel = client.get_transport().open_session()
# 执行命令
channel.exec_command(command)
output = ''
while True: # 持续读取输出
if channel.exit_status_ready(): # 如果命令已经执行完毕
break
# 读取部分输出
output += channel.recv(1024).decode()
# print(output, end='')
# return output
# 关闭连接
client.close()
return output
# 替换为实际的服务器信息和命令
# ssh_console(['python3 /home/yys/script/zcl.py http://v2.uploadbt.com/?r=down&hash=7e53dc22cc08eb7681723885e5548642edb5271e'])
def replace_sensitive_chars(folder_name):
sensitive_chars = ['/', '\\', ':', '*', '?', '"', '<', '>', '|']
for char in sensitive_chars:
folder_name = folder_name.replace(char, '_')
return folder_name
def fetch_rss_data(url):
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Accept-Language": "zh-CN,zh;q=0.9",
}
response = requests.get(url, headers=headers)
lis = []
if response.status_code == 200:
root = ET.fromstring(response.content.decode('utf-8'))
items = root.findall(".//item")
for item in items:
enclosure_url = item.find("enclosure").get("url")
item_source_code = ET.tostring(item, encoding="utf-8").decode("utf-8")
tilte = ''
title_element = item.find("./title")
if title_element is not None:
# print("主标题:",title_element.text)
tilte += replace_sensitive_chars(title_element.text)
r_list = re.compile('<p><strong>(.*?)</strong></p>', re.S).findall(item_source_code)
if r_list and len(r_list[0]) < 100:
title += replace_sensitive_chars(r_list[0])
# print("副标题:", title)
else:
pass
# print("未找到副标题")
# print("链接:", enclosure_url)
# print("-" * 50)
lis.append({'title': tilte, 'url': enclosure_url})
return lis
else:
print(f"无法获取 RSS 源。状态码: {response.status_code}")
def activate_xunlei_window():
# 替换成迅雷窗口的标题
xunlei_title = "迅雷"
# 查找迅雷窗口
xunlei_handle = win32gui.FindWindow(None, xunlei_title)
if xunlei_handle != 0:
# 将窗口激活到最前面
win32gui.ShowWindow(xunlei_handle, win32con.SW_RESTORE)
win32gui.SetForegroundWindow(xunlei_handle)
print(f"成功激活迅雷窗口:{xunlei_title}")
else:
print(f"未找到迅雷窗口:{xunlei_title}")
def download_with_xunlei(url, download_folder_path):
# 启动迅雷应用程序
# app = Application(backend='uia').start(r"D:\app\Thunder\Program\Thunder.exe")
os.popen(r"D:\app\Thunder\Program\Thunder.exe")
activate_xunlei_window()
app = Application(backend='uia').connect(title="迅雷")
# # 等待迅雷主窗口出现
main_window = app.top_window()
main_window.wait('visible')
# 点击新建按钮
new_button = main_window.child_window(title="新建", control_type="Button")
new_button.click()
# 获取新建下载窗口
new_download_window = app.window(title="新建任务面板", control_type="Pane")
# print(new_download_window.print_control_identifiers())
# 输入下载链接
url_edit = new_download_window.child_window(found_index=0, control_type="Edit")
url_edit.set_text(url)
# 输入文件夹路径
folder_path_edit = new_download_window.child_window(found_index=1, control_type="Edit")
text = folder_path_edit.get_value().strip()
print('默认路径', text)
download_folder_path = download_folder_path.strip()
if text != download_folder_path:
folder_path_edit.set_text(download_folder_path)
print("修改路径为:", download_folder_path)
else:
print("无需修改路径", text)
# 点击下载按钮
download_button = new_download_window.child_window(title="立即下载", control_type="Button")
download_button.click()
# 等待下载完成
print("点击下载")
# app = Application(backend='uia').connect(title="迅雷")
for i in range(2):
try:
new_download_window.child_window(title="立即下载", control_type="Button").click()
break
except:
new_download_window.type_keys("{ENTER}")
pass
# Example usage:
if __name__ == "__main__":
hostname = '192.168.31.166'
port = 22 # SSH默认端口是22
username = ''
password = ''
url = "https://www.comicat.org/rss-METALLIC+ROUGE%5D%5B.xml"
ret = fetch_rss_data(url)
# print(ret)
set_file_name = ''
for one_data in ret:
url = one_data.get('url')
title = one_data.get('title')
print("标题=%s" % title)
file_name = input("收入标题名,按回车继续,(如不输入和上次名字相同):").strip()
if file_name == "c":
print("跳过")
continue
elif file_name:
set_file_name = file_name
print("设置文件名=【%s】" % set_file_name)
if not set_file_name:
print("请输入标题名字")
exit()
command = 'python3 /home/yys/script/zcl.py "%s"' % url
# 执行命令并获取结果,连接转磁力链
output = ssh_execute(hostname, port, username, password, command)
url = output.split('【')[-1].split('】')[0]
print(url)
folder_path = os.path.join(r"Z:\jellyfin\media\anime", set_file_name) # 替换为你的文件夹路径
download_with_xunlei(url, folder_path)
使用libtorrent 解析磁力链,这个库在windows不能运行,
sudo apt install python3-libtorrent
import libtorrent as lt
import requests
import sys
def torrent_to_magnet(torrent_url):
# 下载种子文件
response = requests.get(torrent_url, stream=True)
torrent_file = response.content
# 加载种子文件到libtorrent
info = lt.torrent_info(lt.bdecode(torrent_file))
# 生成磁力链接
magnet_link = lt.make_magnet_uri(info)
return magnet_link
arg = sys.argv
# 替换为实际的种子文件链接
torrent_link = "http://v2.uploadbt.com/?r=down&hash=7e53dc22cc08eb7681723885e5548642edb5271e"
if len(arg) >= 2:
torrent_link = str(arg[1])
# 转换为磁力链接
magnet_link = torrent_to_magnet(torrent_link)
# 打印磁力链接
print("磁力链接:【%s】" % magnet_link)
这个代码不能在windowns运行,libtorrent装上提示缺少dll库未解决,在mac和linux可以运行
这段代码主要用于从指定的RSS源中获取条目信息,然后使用Aria2进行下载。
主要函数和功能:
-
fetch_rss_data(url):
- 通过指定的URL获取RSS源的内容。
- 解析XML格式的RSS数据,提取每个条目的标题和下载链接。
- 将标题中的敏感字符(如斜杠、冒号等)替换为下划线。
-
get_bt_link(url):
- 从给定的URL下载种子文件,并使用libtorrent库解析种子文件。
- 生成磁力链接(magnet link)用于下载。
-
download_with_aria2_rpc(password, ip, url, download_path):
- 使用Aria2的RPC接口进行下载。
- 构建JSON-RPC请求,包括Aria2的认证token、下载链接和下载目录。
- 发送POST请求到Aria2的RPC服务器,开始下载任务。
主程序执行流程:
- 主程序通过指定的RSS源地址获取所有的RSS条目信息。
- 对每个条目,从条目中提取下载链接。
- 使用
get_bt_link(url)
获取磁力链接。 - 调用
download_with_aria2_rpc(password, ip, url, download_path)
开始下载任务到指定的下载路径。
注意事项:
- 代码中使用了第三方库requests和libtorrent,需要确保这些库已安装。
- Aria2需要预先配置并启用RPC功能,并确保主机和端口号与代码中的
ip
和端口号匹配。 - 确保在执行前替换所有实际的服务器信息、下载路径、Aria2 RPC密码和其他参数。
这段代码适合用于自动化从RSS源下载资源,并且通过Aria2实现高效的下载管理。
import requests
import xml.etree.ElementTree as ET
import re
import paramiko
import datetime
import time
import time
import os
def ssh_execute(hostname, port, username, password, command):
# 创建SSH客户端
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 连接到远程服务器
client.connect(hostname, port, username, password)
# 获取一个传输通道
channel = client.get_transport().open_session()
# 执行命令
channel.exec_command(command)
output = ''
while True: # 持续读取输出
if channel.exit_status_ready(): # 如果命令已经执行完毕
break
# 读取部分输出
output += channel.recv(1024).decode()
# print(output, end='')
# return output
# 关闭连接
client.close()
return output
# 替换为实际的服务器信息和命令
# ssh_console(['python3 /home/yys/script/zcl.py http://v2.uploadbt.com/?r=down&hash=7e53dc22cc08eb7681723885e5548642edb5271e'])
def replace_sensitive_chars(folder_name):
sensitive_chars = ['/', '\\', ':', '*', '?', '"', '<', '>', '|']
for char in sensitive_chars:
folder_name = folder_name.replace(char, '_')
return folder_name
def fetch_rss_data(url):
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Accept-Language": "zh-CN,zh;q=0.9",
}
response = requests.get(url, headers=headers)
lis = []
if response.status_code == 200:
root = ET.fromstring(response.content.decode('utf-8'))
items = root.findall(".//item")
for item in items:
enclosure_url = item.find("enclosure").get("url")
item_source_code = ET.tostring(item, encoding="utf-8").decode("utf-8")
tilte = ''
title_element = item.find("./title")
if title_element is not None:
# print("主标题:",title_element.text)
tilte += replace_sensitive_chars(title_element.text)
r_list = re.compile('<p><strong>(.*?)</strong></p>', re.S).findall(item_source_code)
if r_list and len(r_list[0]) < 100:
title += replace_sensitive_chars(r_list[0])
# print("副标题:", title)
else:
pass
# print("未找到副标题")
# print("链接:", enclosure_url)
# print("-" * 50)
lis.append({'title': tilte, 'url': enclosure_url})
return lis
else:
print(f"无法获取 RSS 源。状态码: {response.status_code}")
def activate_xunlei_window():
# 替换成迅雷窗口的标题
xunlei_title = "迅雷"
# 查找迅雷窗口
xunlei_handle = win32gui.FindWindow(None, xunlei_title)
if xunlei_handle != 0:
# 将窗口激活到最前面
win32gui.ShowWindow(xunlei_handle, win32con.SW_RESTORE)
win32gui.SetForegroundWindow(xunlei_handle)
print(f"成功激活迅雷窗口:{xunlei_title}")
else:
print(f"未找到迅雷窗口:{xunlei_title}")
def download_with_xunlei(url, download_folder_path):
from pywinauto.application import Application
import win32gui
import win32con
# 启动迅雷应用程序
# app = Application(backend='uia').start(r"D:\app\Thunder\Program\Thunder.exe")
os.popen(r"D:\app\Thunder\Program\Thunder.exe")
activate_xunlei_window()
app = Application(backend='uia').connect(title="迅雷")
# # 等待迅雷主窗口出现
main_window = app.top_window()
main_window.wait('visible')
# 点击新建按钮
new_button = main_window.child_window(title="新建", control_type="Button")
new_button.click()
# 获取新建下载窗口
new_download_window = app.window(title="新建任务面板", control_type="Pane")
# print(new_download_window.print_control_identifiers())
# 输入下载链接
url_edit = new_download_window.child_window(found_index=0, control_type="Edit")
url_edit.set_text(url)
# 输入文件夹路径
folder_path_edit = new_download_window.child_window(found_index=1, control_type="Edit")
text = folder_path_edit.get_value().strip()
print('默认路径', text)
download_folder_path = download_folder_path.strip()
if text != download_folder_path:
folder_path_edit.set_text(download_folder_path)
print("修改路径为:", download_folder_path)
else:
print("无需修改路径", text)
# 点击下载按钮
download_button = new_download_window.child_window(title="立即下载", control_type="Button")
download_button.click()
# 等待下载完成
print("点击下载")
# app = Application(backend='uia').connect(title="迅雷")
for i in range(2):
try:
new_download_window.child_window(title="立即下载", control_type="Button").click()
break
except:
new_download_window.type_keys("{ENTER}")
pass
import requests
import json
def download_with_aria2_rpc(password, ip, url, download_path):
rpc_url = f"http://{ip}:6800/jsonrpc"
headers = {'Content-Type': 'application/json'}
# JSON-RPC payload to add new URI
payload = {
"jsonrpc": "2.0",
"method": "aria2.addUri",
"id": "1",
"params": [
f"token:{password}",
[url],
{"dir": download_path}
]
}
response = requests.post(rpc_url, headers=headers, data=json.dumps(payload))
if response.status_code == 200:
result = response.json()
if 'result' in result:
print("Download started successfully. GID:", result['result'])
else:
print("Failed to start download. Response:", result)
else:
print(f"Failed to connect to aria2 RPC server. Status code: {response.status_code}")
def get_bt_link(url):
import libtorrent as lt
import requests
import sys
# 下载种子文件
response = requests.get(url, stream=True)
torrent_file = response.content
# 加载种子文件到libtorrent
info = lt.torrent_info(lt.bdecode(torrent_file))
# 生成磁力链接
magnet_link = lt.make_magnet_uri(info)
return magnet_link
# Example usage:
if __name__ == "__main__":
# rss源地址
url = "https://www.comicat.org/rss-%5BGM-Team%5D%5B%E5%9B%BD%E6%BC%AB%5D%5B%E6%96%97%E7%BD%97%E5%A4%A7%E9%99%86%E2%85%A1+%E7%BB%9D%E4%B8%96%E5%94%90%E9%97%A8%5D%5BSoul+Land+%E2%85%A1%EF%BC%9AThe+Peerless+Tang+Clan%5D%5B2023%5D%5B56%5D%5BAVC%5D%5BGB%5D%5B1080P%5D.xml"
ret = fetch_rss_data(url) # 获取所有链接
# araia2
password = "123456" # aria2 RPC 密钥
ip = "192.168.31.113"
set_file_name = '斗罗大陆Ⅱ 绝世唐门' # 自动创建文件夹
download_path = "/jellyfin/media/anime/" + set_file_name # 这个安装了aria2的电脑的地址
for one_data in ret:
url = one_data.get('url')
title = one_data.get('title')
print("标题=%s" % title)
output = ''
for i in range(10):
try:
output = get_bt_link(url) # 获取磁力链接
break
except Exception as e:
print(e)
time.sleep(2)
url = output.split('【')[-1].split('】')[0]
print(url)
# 示例调用
download_with_aria2_rpc( password, ip, url, download_path)
本文作者: 永生
本文链接: https://yys.zone/detail/?id=337
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明出处!
发表评论
评论列表 (0 条评论)
暂无评论,快来抢沙发吧!