代码拉取完成,页面将自动刷新
import base64
import mimetypes
import os
import yaml
from telegram import InlineKeyboardButton, InlineKeyboardMarkup, ForceReply, Update, ReplyKeyboardMarkup
from telegram.constants import ParseMode
from telegram.ext import ApplicationBuilder, CallbackQueryHandler, CommandHandler, MessageHandler, filters, \
ConversationHandler, CallbackContext, ContextTypes
import requests
from datetime import datetime
import logging
from telegram.ext.filters import REPLY
# 设置日志配置
logging.basicConfig(filename='bot.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# 读取配置文件
config_file_path = 'config.yml'
with open(config_file_path, 'r') as config_file:
config = yaml.safe_load(config_file)
owners = config.get('owners', [])
TOKEN = config['telegram']['token']
API_BASE_URL = config['api']['base_url']
API_AUTH_TOKEN = config['api']['auth_token']
UPLOAD_DIR_BASE = '/' # 基础上传目录
MENU, UPLOAD, NEW_DIRECTORY = range(3)
async def start(update, context):
user_id = update.message.chat_id
if 'user_data' not in context.chat_data:
context.chat_data['user_data'] = {}
if user_id not in context.chat_data['user_data']:
context.chat_data['user_data'][user_id] = {}
# 是否主人
logging.info(
'User {} {} joined the chat.'.format(update.message.from_user.first_name, update.message.from_user.last_name))
if is_owner(user_id, owners):
file_name = context.args[0] if context.args else None
logging.info('Parameter: {}'.format(file_name))
if file_name:
file_name = decode_base64(file_name)
await list_files(update, context, file_name, user_id)
else:
set_user_current_path(context, user_id, UPLOAD_DIR_BASE)
set_user_current_page(context, user_id, 1)
file_list, total_count = get_remote_file_list(context, user_id)
total_pages = calculate_total_pages(total_count)
current_path = get_user_current_path(context, user_id)
keyboard, text_list = build_keyboard(file_list, current_path, 1, total_pages)
logging.info(text_list)
reply_markup = ReplyKeyboardMarkup(keyboard)
from_user = update.message.from_user
await update.message.reply_text(
text=f'<b>{from_user.first_name} {from_user.last_name}</b>,欢迎使用资源の小站Bot!\n\n{text_list}',
reply_markup=reply_markup,
parse_mode=ParseMode.HTML
)
return MENU
else:
await update.message.reply_text(
'抱歉,您没有使用权限!如需使用,可发送 /configure 指令,配置自己的AList地址和Token信息。')
def configure(update, context):
user_id = update.message.chat_id
owners = config.get('owners', [])
if is_owner(user_id, owners):
update.message.reply_text('您已被授权直接使用,无需配置!')
return MENU
else:
update.message.reply_text('欢迎使用! 请提供您的AList的地址和令牌Token.\n\n'
'请先回复AList地址,结尾不要包含/:')
return UPLOAD
def handle_remote_api_url(update, context):
user_id = update.message.chat_id
context.chat_data['user_data'][user_id]['api_url'] = update.message.text
update.message.reply_text('AList地址设置成功.\n\n'
'请继续回复您的令牌Token:')
return UPLOAD
def handle_auth_token(update, context):
user_id = update.message.chat_id
context.chat_data['user_data'][user_id]['auth_token'] = update.message.text
update.message.reply_text('配置成功.\n\n'
'您现在可以使用了,开始吧!')
return MENU
async def handle_document(update, context):
logging.info('处理文件上传……')
user_id = update.message.chat_id
file = context.bot.getFile(update.message.document.file_id)
current_path = get_user_current_path(context, user_id)
headers = {
'Authorization': f'Bearer {API_AUTH_TOKEN}',
'Content-Type': 'multipart/form-data;', # Add Content-Type header
'File-Path': current_path, # Add File-Path header
'Content-Length': str(file.file_size) # Add Content-Length header
}
files = {'file': (update.message.document.file_name, file.download_as_bytearray())}
url = f'{API_BASE_URL}/api/fs/form'
try:
response = requests.post(url, headers=headers, data=None, files=files)
if response.status_code == 200:
await context.bot.send_message(user_id,
f'文件 "{update.message.document.file_name}" 成功上传至 {current_path}')
else:
error_message = response.json().get("message", "未知错误")
await context.bot.send_message(user_id, f'文件上传失败,参考信息: {error_message}')
except Exception as e:
logging.error(f'文件上传失败,错误信息: {str(e)}')
await context.bot.send_message(user_id, '文件上传失败,发生了意外错误。')
return MENU
def get_user_current_path(context, user_id):
return context.chat_data['user_data'][user_id].get('current_path', UPLOAD_DIR_BASE)
def get_user_current_page(context, user_id):
return context.chat_data['user_data'][user_id].get('current_page', 1)
def set_user_current_path(context, user_id, current_path):
context.chat_data['user_data'][user_id]['current_path'] = current_path
async def is_owner(user_id, owners):
return str(user_id) in map(str, owners)
def get_remote_file_list(context, user_id, current_path=None, current_page=None):
logging.info('查询远程目录……')
headers = {'Authorization': f'{API_AUTH_TOKEN}'}
if not current_path:
current_path = get_user_current_path(context, user_id)
if not current_page:
current_page = get_user_current_page(context, user_id)
data = {"path": current_path, "page": current_page, "per_page": 5}
url = f'{API_BASE_URL}/api/fs/list'
response = requests.post(url, headers=headers, data=data)
if response.status_code == 200:
json_data = response.json()
print(json_data)
if json_data['code'] == 200:
return json_data['data']['content'], json_data['data']['total']
return [], 0
else:
return [], 0
def format_size(size):
# 1024 Bytes = 1 KB
# 1024 KB = 1 MB
# 1024 MB = 1 GB
# ...
units = ['B', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB']
unit_index = 0
while size >= 1024 and unit_index < len(units) - 1:
size /= 1024.0
unit_index += 1
return f'{size:.2f} {units[unit_index]}'
def format_datetime(input_datetime_str):
try:
# 尝试解析带有毫秒的格式
input_datetime = datetime.strptime(input_datetime_str, '%Y-%m-%dT%H:%M:%S.%f%z')
except ValueError:
# 如果解析失败,尝试解析不带毫秒的格式
input_datetime = datetime.strptime(input_datetime_str, '%Y-%m-%dT%H:%M:%SZ')
# 格式化为 yyyy-MM-dd HH:mm 格式
formatted_datetime = input_datetime.strftime("%Y-%m-%d %H:%M")
return formatted_datetime
def encode_base64(text):
encoded_bytes = base64.b64encode(text.encode('utf-8'))
return encoded_bytes.decode('utf-8')
# 示例函数,将 base64 编码的字符串解码
def decode_base64(encoded_text):
decoded_bytes = base64.b64decode(encoded_text.encode('utf-8'))
return decoded_bytes.decode('utf-8')
def build_keyboard(file_list, current_path, current_page, total_pages):
text_list = []
idx = 0
for file_info in file_list:
idx += 1
file_name = file_info['name']
file_size = file_info['size']
file_modi = file_info['modified']
is_dir = file_info['is_dir']
modi_time = format_datetime(file_modi)
formatted_size = format_size(file_size)
if is_dir:
file_icon = '📁'
else:
file_icon = '📋'
text_list.append(
f"{file_icon} {str(idx)}、<b style='font-size: 16px;'><a href='https://t.me/my_alist_bot?start={encode_base64(file_name)}'>{file_name}</a></b>\n<b style='color:blue;'>类型</b>:{'目录' if is_dir else '文件'} <b style='color:blue;'>大小</b>:{formatted_size} \n<b style='color:blue;'>更新时间</b>:{modi_time}")
navigation_buttons = []
if current_path != UPLOAD_DIR_BASE:
# Add a button to go to the root directory
navigation_buttons.append('根目录🏠')
# Add a button to go to the parent directory
navigation_buttons.append('上一级⤴️')
navigation_buttons.append('上传文件⬆️')
navigation_buttons.append('新增目录#️⃣')
# Add buttons for pagination
if current_page > 1:
navigation_buttons.append('上一页⬅️')
navigation_buttons.append(f'第{current_page}页/共{total_pages}页')
if current_page < total_pages:
navigation_buttons.append('下一页➡️')
keyboard = [navigation_buttons[i:i + 2] for i in range(0, len(navigation_buttons), 2)]
return keyboard, '\n\n'.join(text_list)
def set_user_current_page(context, user_id, current_page):
context.chat_data['user_data'][user_id]['current_page'] = current_page
async def list_files(update, context, query=None, user_id=None):
logging.info('处理文件查询……')
if not query:
user_id = update.callback_query.message.chat_id
query = update.callback_query.data.replace('list_files_', '')
owners = config.get('owners', [])
current_path = get_user_current_path(context, user_id)
logging.info("query: {}".format(query))
if is_owner(user_id, owners):
if query == 'home':
logging.info("返回主目录")
current_path = UPLOAD_DIR_BASE
set_user_current_path(context, user_id, current_path)
current_page = 1
set_user_current_page(context, user_id, current_page)
elif query == 'up':
logging.info("返回上级目录")
current_path = os.path.dirname(current_path)
set_user_current_path(context, user_id, current_path)
current_page = 1
set_user_current_page(context, user_id, current_page)
elif query == 'upload':
logging.info("准备上传文件")
context.chat_data['user_data'][user_id]['uploading'] = True
await context.bot.send_message(user_id, '请选择你要上传的文件')
return MENU
elif query.startswith('page_'):
current_page = int(query.split('_')[1])
logging.info("查询分页:{}/{}".format(current_path, current_page))
else:
# 检查用户点击的是文件还是目录
is_directory = is_remote_directory(context, user_id, query)
if is_directory:
try:
current_page = 1
current_path = os.path.join(current_path, query)
# 重新获取文件列表
file_list, total_count = get_remote_file_list(context, user_id, current_path, current_page)
total_pages = calculate_total_pages(total_count)
keyboard, text_list = build_keyboard(file_list, current_path, current_page, total_pages)
reply_markup = ReplyKeyboardMarkup(keyboard)
logging.info(text_list)
await context.bot.send_message(
chat_id=user_id,
text=f'当前位置: {current_path}\n\n{text_list}',
reply_markup=reply_markup,
parse_mode=ParseMode.HTML
)
# 更新用户查看的目录
set_user_current_path(context, user_id, current_path)
# 重置页码
set_user_current_page(context, user_id, current_page)
except Exception as e:
logging.error(e)
await list_files(update, context)
return MENU
else:
# 如果是文件,获取文件链接
file_url = get_remote_file_url(context, user_id, query)
if is_image_or_video(file_url):
# 图片或视频
await context.bot.send_photo(chat_id=user_id, photo=file_url)
else:
# 其他文件,发送链接
await context.bot.send_message(user_id, f"点击链接下载文件: <a href='{file_url}'>{query}</a>",
parse_mode=ParseMode.HTML)
return MENU
file_list, total_count = get_remote_file_list(context, user_id)
total_pages = calculate_total_pages(total_count)
keyboard, text_list = build_keyboard(file_list, current_path, current_page, total_pages)
reply_markup = ReplyKeyboardMarkup(keyboard)
await context.bot.edit_message_text(
chat_id=user_id,
message_id=update.callback_query.message.message_id,
text=f'当前位置: {current_path}\n\n{text_list}',
reply_markup=reply_markup,
parse_mode=ParseMode.HTML
)
set_user_current_page(context, user_id, current_page)
return MENU
else:
await context.bot.send_message(user_id,
'抱歉,您没有使用权限!如需使用,可发送 /configure 指令,配置自己的AList地址和Token信息。')
return ConversationHandler.END
def is_remote_directory(context, user_id, item):
headers = {'Authorization': f'{API_AUTH_TOKEN}'}
current_path = get_user_current_path(context, user_id)
data = {"path": f"{current_path}/{item}", "page": 1, "per_page": 5}
url = f'{API_BASE_URL}/api/fs/get'
# 获取文件详情
response = requests.post(url, headers=headers, data=data)
if response.status_code == 200:
json_data = response.json()
if json_data['code'] == 200:
return json_data['data']['is_dir']
return False
def get_remote_file_url(context, user_id, item):
current_path = get_user_current_path(context, user_id)
return f"{API_BASE_URL}{current_path}/{item}"
def is_image_or_video(file_url):
mime_type, _ = mimetypes.guess_type(file_url)
return mime_type and mime_type.startswith(('image', 'video'))
def calculate_total_pages(total_count, files_per_page=5):
total_pages = (total_count + files_per_page - 1) // files_per_page
return total_pages
ASK_NEW_DIRECTORY_NAME = range(3, 6)
async def ask_new_directory_name(update, context):
user_id = update.message.chat_id
await context.bot.send_message(user_id, '请输入新目录的名称:', reply_markup=ForceReply(selective=True))
return ASK_NEW_DIRECTORY_NAME
async def new_directory(update, context):
user_id = update.message.chat_id
context.chat_data['user_data'][user_id]['state'] = ASK_NEW_DIRECTORY_NAME
await context.bot.send_message(
chat_id=user_id,
text="请输入新目录的名称:",
reply_markup=ForceReply(
input_field_placeholder="请输入新目录的名称:",
selective=True,
),
)
return ASK_NEW_DIRECTORY_NAME
# 添加一个新的处理函数,用于处理用户回复的新目录名称
async def handle_new_directory_name(update, context):
user_id = update.message.chat_id
new_directory_name = update.message.text
# 返回到 MENU 状态
return list_files(update, context)
def main():
print('Starting bot...')
# 设置 logging 模块的日志级别和格式
logging.basicConfig(filename='bot.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
application = ApplicationBuilder().token(TOKEN).build()
# 添加日志处理器,将日志输出到控制台
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
logging.getLogger('').addHandler(console_handler)
# Handle the start command
start_handler = CommandHandler('start', start)
application.add_handler(start_handler)
# Handle document uploads
conversation_handler = ConversationHandler(
per_chat=True,
per_user=True,
entry_points=[MessageHandler(filters.Document.ALL, handle_document)],
states={
MENU: [CallbackQueryHandler(list_files, pattern=r'^home|up|upload|(page_.*)|(list_files_.*)$')],
},
fallbacks=[],
)
application.add_handler(mkdir_handler)
application.add_handler(home_handler)
application.add_handler(conversation_handler)
# Handle file listing and navigation
list_files_handler = CallbackQueryHandler(list_files, pattern=r'^home|up|mkdir|upload|(page_.*)|(list_files_.*)$')
application.add_handler(list_files_handler)
application.add_error_handler(error_callback)
logging.info("Bot已启动")
application.run_polling()
async def mkdir_typing(chain: Update, context: ContextTypes.DEFAULT_TYPE) -> range:
logging.info(f"{chain.effective_user.name} 请求新建目录")
strict_reply_flag = ForceReply(
input_field_placeholder="请输入新目录的名称",
selective=True,
)
await context.bot.send_message(
chat_id=chain.effective_chat.id,
text="请输入新目录的名称",
reply_markup=strict_reply_flag,
reply_to_message_id=chain.effective_message.id,
)
return REPLY
async def error_callback(chain, context: CallbackContext):
logging.error(f"发生错误\n{chain}\n: {context.error}")
async def return_home(chain, context: CallbackContext):
logging.info(f"{chain.effective_user.name}")
user_id = chain.effective_user.id
set_user_current_path(context, user_id, UPLOAD_DIR_BASE)
await list_files(chain, context)
async def mkdir_replied(chain: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
user_id = chain.effective_user.id
owner_flag = await is_owner(chain.effective_user.id, owners)
if not owner_flag:
await context.bot.send_message(
chat_id=chain.effective_chat.id,
text=owner_flag,
reply_to_message_id=chain.effective_message.id,
)
return ConversationHandler.END
logging.info(f"{chain.effective_user.name} 已回复新目录名称")
dir_name = chain.effective_message.text
if not dir_name:
await context.bot.send_message(
chat_id=chain.effective_chat.id,
text="未检测到目录名称",
reply_to_message_id=chain.effective_message.id,
)
return ConversationHandler.END
try:
# 调用接口新建目录
headers = {'Authorization': f'{API_AUTH_TOKEN}'}
current_path = get_user_current_path(context, user_id)
data = {'path': os.path.join(current_path, dir_name)}
url = f'{API_BASE_URL}/api/fs/mkdir'
response = requests.post(url, headers=headers, data=data)
if response.status_code == 200:
await context.bot.send_message(user_id, f'目录 "{dir_name}" 创建成功,位于 {current_path}')
else:
await context.bot.send_message(user_id, f'创建目录失败,参考信息: {response.json()["message"]}')
except Exception as e:
logging.error(e)
finally:
return ConversationHandler.END
mkdir_handler = ConversationHandler(
per_chat=True,
per_user=True,
entry_points=[MessageHandler(filters.Regex("新建目录"), mkdir_typing)],
states={REPLY: [MessageHandler(~filters.COMMAND, mkdir_replied)]},
fallbacks=[MessageHandler(~filters.COMMAND, mkdir_replied)],
)
home_handler = MessageHandler(filters.Regex("根目录"), return_home)
if __name__ == '__main__':
main()
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。