从RSS源中获取动画集信息、通过SSH将URL转换为磁力链接进行解析(linux),然后使用迅雷应用程序开始下载的过程。在执行过程中,用户会被提示输入一些参数。这是Web抓取、SSH自动化和Windows GUI自动化的组合应用

import requests
import xml.etree.ElementTree as ET
import re
import paramiko
import datetime
import time

from pywinauto.application import Application
import time
import os
import win32gui
import win32con

def ssh_execute(hostname, port, username, password, command):
    # 创建SSH客户端
    client = paramiko.SSHClient()
    client.set_missing_host_key_policy(paramiko.AutoAddPolicy())

    # 连接到远程服务器
    client.connect(hostname, port, username, password)

    # 获取一个传输通道
    channel = client.get_transport().open_session()

    # 执行命令
    channel.exec_command(command)
    output = ''
    while True:  # 持续读取输出
        if channel.exit_status_ready():  # 如果命令已经执行完毕
            break
        # 读取部分输出
        output += channel.recv(1024).decode()
        
        # print(output, end='')
        # return output
    # 关闭连接
    client.close()
    return output
# 替换为实际的服务器信息和命令



# ssh_console(['python3 /home/yys/script/zcl.py http://v2.uploadbt.com/?r=down&hash=7e53dc22cc08eb7681723885e5548642edb5271e'])

def replace_sensitive_chars(folder_name):
    sensitive_chars = ['/', '\\', ':', '*', '?', '"', '<', '>', '|']
    for char in sensitive_chars:
        folder_name = folder_name.replace(char, '_')
    return folder_name

def fetch_rss_data(url):
    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
        "Accept-Language": "zh-CN,zh;q=0.9",
    }

    response = requests.get(url, headers=headers)
    lis = []
    if response.status_code == 200:
        root = ET.fromstring(response.content.decode('utf-8'))
        items = root.findall(".//item")

        for item in items:
            enclosure_url = item.find("enclosure").get("url")
            item_source_code = ET.tostring(item, encoding="utf-8").decode("utf-8")
            
            tilte = ''
            title_element = item.find("./title")
            
            if title_element is not None:
                # print("主标题:",title_element.text)
                tilte += replace_sensitive_chars(title_element.text)
            r_list = re.compile('&lt;p&gt;&lt;strong&gt;(.*?)&lt;/strong&gt;&lt;/p&gt;', re.S).findall(item_source_code)
            if r_list and len(r_list[0]) < 100:
                title += replace_sensitive_chars(r_list[0])
                # print("副标题:", title)
                
            else:
                pass
                # print("未找到副标题")

            # print("链接:", enclosure_url)
            # print("-" * 50)
            lis.append({'title': tilte, 'url': enclosure_url})
        return lis
    else:
        print(f"无法获取 RSS 源。状态码: {response.status_code}")


def activate_xunlei_window():
    # 替换成迅雷窗口的标题
    xunlei_title = "迅雷"

    # 查找迅雷窗口
    xunlei_handle = win32gui.FindWindow(None, xunlei_title)

    if xunlei_handle != 0:
        # 将窗口激活到最前面
        win32gui.ShowWindow(xunlei_handle, win32con.SW_RESTORE)
        win32gui.SetForegroundWindow(xunlei_handle)
        print(f"成功激活迅雷窗口:{xunlei_title}")
    else:
        print(f"未找到迅雷窗口:{xunlei_title}")

def download_with_xunlei(url, download_folder_path):
    # 启动迅雷应用程序
    # app = Application(backend='uia').start(r"D:\app\Thunder\Program\Thunder.exe")
    os.popen(r"D:\app\Thunder\Program\Thunder.exe")
    activate_xunlei_window()
    app = Application(backend='uia').connect(title="迅雷")
    # # 等待迅雷主窗口出现

    main_window = app.top_window()
    main_window.wait('visible')

    # 点击新建按钮
    new_button = main_window.child_window(title="新建", control_type="Button")
    new_button.click()

    # 获取新建下载窗口
    new_download_window = app.window(title="新建任务面板", control_type="Pane")
    # print(new_download_window.print_control_identifiers())
    # 输入下载链接
    url_edit = new_download_window.child_window(found_index=0, control_type="Edit")
    url_edit.set_text(url)
    
    # 输入文件夹路径
    folder_path_edit = new_download_window.child_window(found_index=1, control_type="Edit")
    text = folder_path_edit.get_value().strip()
    print('默认路径', text)
    download_folder_path = download_folder_path.strip()
    if text != download_folder_path:
        folder_path_edit.set_text(download_folder_path)
        print("修改路径为:", download_folder_path)
    else:
        print("无需修改路径", text)
    # 点击下载按钮
    download_button = new_download_window.child_window(title="立即下载", control_type="Button")
    download_button.click()
    # 等待下载完成
    print("点击下载")
    # app = Application(backend='uia').connect(title="迅雷")
    for i in range(2):
        try:
            new_download_window.child_window(title="立即下载", control_type="Button").click()
            break
        except:
            new_download_window.type_keys("{ENTER}")
            pass

# Example usage:
if __name__ == "__main__":

    hostname = '192.168.31.166'
    port = 22  # SSH默认端口是22
    username = ''
    password = ''
    url = "https://www.comicat.org/rss-METALLIC+ROUGE%5D%5B.xml"
    ret = fetch_rss_data(url)
    # print(ret)

    set_file_name = ''
    for one_data in ret:
        url = one_data.get('url')
        title = one_data.get('title')
        print("标题=%s" % title)

        file_name = input("收入标题名,按回车继续,(如不输入和上次名字相同):").strip()

        if file_name == "c":
            print("跳过")
            continue

        elif file_name:
            set_file_name = file_name
        print("设置文件名=【%s】" % set_file_name)
        if not set_file_name:
            print("请输入标题名字")
            exit()
        command = 'python3 /home/yys/script/zcl.py "%s"' % url
        # 执行命令并获取结果,连接转磁力链
        output = ssh_execute(hostname, port, username, password, command)
        url = output.split('【')[-1].split('】')[0]
        print(url)
        folder_path = os.path.join(r"Z:\jellyfin\media\anime", set_file_name)  # 替换为你的文件夹路径
        download_with_xunlei(url, folder_path)

使用libtorrent 解析磁力链,这个库在windows不能运行,

sudo apt install python3-libtorrent
import libtorrent as lt
import requests
import sys
def torrent_to_magnet(torrent_url):
    # 下载种子文件
    response = requests.get(torrent_url, stream=True)
    torrent_file = response.content

    # 加载种子文件到libtorrent
    info = lt.torrent_info(lt.bdecode(torrent_file))

    # 生成磁力链接
    magnet_link = lt.make_magnet_uri(info)
    return magnet_link

arg = sys.argv
# 替换为实际的种子文件链接
torrent_link = "http://v2.uploadbt.com/?r=down&hash=7e53dc22cc08eb7681723885e5548642edb5271e"
if len(arg) >= 2:
    torrent_link = str(arg[1])

# 转换为磁力链接
magnet_link = torrent_to_magnet(torrent_link)

# 打印磁力链接
print("磁力链接:【%s】" % magnet_link)

 

2.使用aria2下载,全代码

这个代码不能在windowns运行,libtorrent装上提示缺少dll库未解决,在mac和linux可以运行

 

这段代码主要用于从指定的RSS源中获取条目信息,然后使用Aria2进行下载。

主要函数和功能:

  1. fetch_rss_data(url):

    • 通过指定的URL获取RSS源的内容。
    • 解析XML格式的RSS数据,提取每个条目的标题和下载链接。
    • 将标题中的敏感字符(如斜杠、冒号等)替换为下划线。
  2. get_bt_link(url):

    • 从给定的URL下载种子文件,并使用libtorrent库解析种子文件。
    • 生成磁力链接(magnet link)用于下载。
  3. download_with_aria2_rpc(password, ip, url, download_path):

    • 使用Aria2的RPC接口进行下载。
    • 构建JSON-RPC请求,包括Aria2的认证token、下载链接和下载目录。
    • 发送POST请求到Aria2的RPC服务器,开始下载任务。

主程序执行流程:

  • 主程序通过指定的RSS源地址获取所有的RSS条目信息。
  • 对每个条目,从条目中提取下载链接。
  • 使用get_bt_link(url)获取磁力链接。
  • 调用download_with_aria2_rpc(password, ip, url, download_path)开始下载任务到指定的下载路径。

注意事项:

  • 代码中使用了第三方库requests和libtorrent,需要确保这些库已安装。
  • Aria2需要预先配置并启用RPC功能,并确保主机和端口号与代码中的ip和端口号匹配。
  • 确保在执行前替换所有实际的服务器信息、下载路径、Aria2 RPC密码和其他参数。

这段代码适合用于自动化从RSS源下载资源,并且通过Aria2实现高效的下载管理。

import requests
import xml.etree.ElementTree as ET
import re
import paramiko
import datetime
import time


import time
import os

def ssh_execute(hostname, port, username, password, command):
    # 创建SSH客户端
    client = paramiko.SSHClient()
    client.set_missing_host_key_policy(paramiko.AutoAddPolicy())

    # 连接到远程服务器
    client.connect(hostname, port, username, password)

    # 获取一个传输通道
    channel = client.get_transport().open_session()

    # 执行命令
    channel.exec_command(command)
    output = ''
    while True:  # 持续读取输出
        if channel.exit_status_ready():  # 如果命令已经执行完毕
            break
        # 读取部分输出
        output += channel.recv(1024).decode()
        
        # print(output, end='')
        # return output
    # 关闭连接
    client.close()
    return output
# 替换为实际的服务器信息和命令



# ssh_console(['python3 /home/yys/script/zcl.py http://v2.uploadbt.com/?r=down&hash=7e53dc22cc08eb7681723885e5548642edb5271e'])

def replace_sensitive_chars(folder_name):
    sensitive_chars = ['/', '\\', ':', '*', '?', '"', '<', '>', '|']
    for char in sensitive_chars:
        folder_name = folder_name.replace(char, '_')
    return folder_name

def fetch_rss_data(url):
    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
        "Accept-Language": "zh-CN,zh;q=0.9",
    }

    response = requests.get(url, headers=headers)
    lis = []
    if response.status_code == 200:
        root = ET.fromstring(response.content.decode('utf-8'))
        items = root.findall(".//item")

        for item in items:
            enclosure_url = item.find("enclosure").get("url")
            item_source_code = ET.tostring(item, encoding="utf-8").decode("utf-8")
            
            tilte = ''
            title_element = item.find("./title")
            
            if title_element is not None:
                # print("主标题:",title_element.text)
                tilte += replace_sensitive_chars(title_element.text)
            r_list = re.compile('&lt;p&gt;&lt;strong&gt;(.*?)&lt;/strong&gt;&lt;/p&gt;', re.S).findall(item_source_code)
            if r_list and len(r_list[0]) < 100:
                title += replace_sensitive_chars(r_list[0])
                # print("副标题:", title)
                
            else:
                pass
                # print("未找到副标题")

            # print("链接:", enclosure_url)
            # print("-" * 50)
            lis.append({'title': tilte, 'url': enclosure_url})
        return lis
    else:
        print(f"无法获取 RSS 源。状态码: {response.status_code}")


def activate_xunlei_window():
    # 替换成迅雷窗口的标题
    xunlei_title = "迅雷"

    # 查找迅雷窗口
    xunlei_handle = win32gui.FindWindow(None, xunlei_title)

    if xunlei_handle != 0:
        # 将窗口激活到最前面
        win32gui.ShowWindow(xunlei_handle, win32con.SW_RESTORE)
        win32gui.SetForegroundWindow(xunlei_handle)
        print(f"成功激活迅雷窗口:{xunlei_title}")
    else:
        print(f"未找到迅雷窗口:{xunlei_title}")

def download_with_xunlei(url, download_folder_path):
    from pywinauto.application import Application
    import win32gui
    import win32con

    # 启动迅雷应用程序
    # app = Application(backend='uia').start(r"D:\app\Thunder\Program\Thunder.exe")
    os.popen(r"D:\app\Thunder\Program\Thunder.exe")
    activate_xunlei_window()
    app = Application(backend='uia').connect(title="迅雷")
    # # 等待迅雷主窗口出现

    main_window = app.top_window()
    main_window.wait('visible')

    # 点击新建按钮
    new_button = main_window.child_window(title="新建", control_type="Button")
    new_button.click()

    # 获取新建下载窗口
    new_download_window = app.window(title="新建任务面板", control_type="Pane")
    # print(new_download_window.print_control_identifiers())
    # 输入下载链接
    url_edit = new_download_window.child_window(found_index=0, control_type="Edit")
    url_edit.set_text(url)
    
    # 输入文件夹路径
    folder_path_edit = new_download_window.child_window(found_index=1, control_type="Edit")
    text = folder_path_edit.get_value().strip()
    print('默认路径', text)
    download_folder_path = download_folder_path.strip()
    if text != download_folder_path:
        folder_path_edit.set_text(download_folder_path)
        print("修改路径为:", download_folder_path)
    else:
        print("无需修改路径", text)
    # 点击下载按钮
    download_button = new_download_window.child_window(title="立即下载", control_type="Button")
    download_button.click()
    # 等待下载完成
    print("点击下载")
    # app = Application(backend='uia').connect(title="迅雷")
    for i in range(2):
        try:
            new_download_window.child_window(title="立即下载", control_type="Button").click()
            break
        except:
            new_download_window.type_keys("{ENTER}")
            pass

import requests
import json

def download_with_aria2_rpc(password, ip, url, download_path):
    rpc_url = f"http://{ip}:6800/jsonrpc"

    headers = {'Content-Type': 'application/json'}

    # JSON-RPC payload to add new URI
    payload = {
        "jsonrpc": "2.0",
        "method": "aria2.addUri",
        "id": "1",
        "params": [
            f"token:{password}",
            [url],
            {"dir": download_path}
        ]
    }

    response = requests.post(rpc_url, headers=headers, data=json.dumps(payload))

    if response.status_code == 200:
        result = response.json()
        if 'result' in result:
            print("Download started successfully. GID:", result['result'])
        else:
            print("Failed to start download. Response:", result)
    else:
        print(f"Failed to connect to aria2 RPC server. Status code: {response.status_code}")

def get_bt_link(url):
    import libtorrent as lt
    import requests
    import sys
     # 下载种子文件
    response = requests.get(url, stream=True)
    torrent_file = response.content

    # 加载种子文件到libtorrent
    info = lt.torrent_info(lt.bdecode(torrent_file))

    # 生成磁力链接
    magnet_link = lt.make_magnet_uri(info)
    return magnet_link

    
# Example usage:
if __name__ == "__main__":

    # rss源地址
    url = "https://www.comicat.org/rss-%5BGM-Team%5D%5B%E5%9B%BD%E6%BC%AB%5D%5B%E6%96%97%E7%BD%97%E5%A4%A7%E9%99%86%E2%85%A1+%E7%BB%9D%E4%B8%96%E5%94%90%E9%97%A8%5D%5BSoul+Land+%E2%85%A1%EF%BC%9AThe+Peerless+Tang+Clan%5D%5B2023%5D%5B56%5D%5BAVC%5D%5BGB%5D%5B1080P%5D.xml"
    ret = fetch_rss_data(url) # 获取所有链接
    # araia2
    password = "123456" # aria2 RPC 密钥
    ip = "192.168.31.113"
    set_file_name = '斗罗大陆Ⅱ 绝世唐门' # 自动创建文件夹
    download_path = "/jellyfin/media/anime/" + set_file_name  # 这个安装了aria2的电脑的地址

    for one_data in ret:
        url = one_data.get('url')
        title = one_data.get('title')
        print("标题=%s" % title)
        output = ''
        for i in range(10):
            try:
                output = get_bt_link(url) # 获取磁力链接
                break
            except Exception as e:
                print(e)
            time.sleep(2)
        url = output.split('【')[-1].split('】')[0]
        print(url)
        # 示例调用
        download_with_aria2_rpc( password, ip, url, download_path)