# django+constance+asgi+signals+apscheduler添加任务记录
3 min read
1.在apscheduler里增加任务(manualcmd1.py),一共有两个地方:
# 1.增加签到功能def sign_aliyunpan(): """ 阿里云盘签到 接口参考:https://github.com/Anonym-w/autoSigninAliyun/blob/main/autoSignin.js """ update_access_token_url = "https://auth.aliyundrive.com/v2/account/token" signin_url = "https://member.aliyundrive.com/v1/activity/sign_in_list" refresh_token_arr = ["自己的token", ]
headers = {'Content-Type': 'application/json'}
for refresh_token in refresh_token_arr: query_body = { 'grant_type': 'refresh_token', 'refresh_token': refresh_token }
# 使用 refresh_token 更新 access_token res = requests.post(update_access_token_url, headers=headers, data=json.dumps(query_body)) if res.status_code == 200: json_data = json.loads(res.text) access_token = json_data['access_token'] print(f"Access Token: {access_token}")
# 签到 headers['Authorization'] = f"Bearer {access_token}" res = requests.post(signin_url, headers=headers, data=json.dumps(query_body)) if res.status_code == 200: json_data = json.loads(res.text) print(json_data) else: print(f"签到请求失败,状态码:{res.status_code}") else: print(f"更新 Access Token 请求失败,状态码:{res.status_code}")
#2.添加到任务 # 阿里云盘每日签到 scheduler.add_job( sign_aliyunpan, trigger=CronTrigger(day_of_week='mon-sun', hour='10', minute='15', second='01', jitter=600), # 周一到周日 id=global_var.get_value_("sign_aliyunpan"), # 你要执行的函数,在global_var里需要手动设置 max_instances=1, replace_existing=True, ) logger.info("Added job 'sign_aliyunpan'.")2.在settings.py添加constance选项:
from collections import OrderedDictCONSTANCE_CONFIG = OrderedDict([ # ('SITE_NAME', ('My Title', 'Website title')), # ('SITE_DESCRIPTION', ('', 'Website description')), # ('THEME', ('light-blue', 'Website theme')), ("DEBUG", (True, 'Debug mode')), ("SyncIP", (False, '修改Dnspod上的IP')), ("Sign_aliyunpan", (True, '阿里云盘每日签到')), ("Sign_hifini_func", (True, 'hifini每日签到')),])3.SomeSchedule.views里添加默认状态:
# 初始化django-constance里的选项config.DEBUG = True # 启动时设置debug为开启状态config.SyncIP = False # 启动时设置sync ip设置为关闭状态config.Sign_aliyunpan = True # 启动时设置Sign_aliyunpan设置开启状态config.Sign_hifini_func = True # 启动时设置Sign_hifini_func设置开启状态4.在global_var.py中添加变量:
def init_(): global global_dict global_dict = { "scheduler": BackgroundScheduler(timezone=settings.TIME_ZONE), "sign_aliyunpan": "sign_aliyunpan", "sign_hifini_func": "sign_hifini_func", "sync_ip": "sync_ip", }5.在signals.py中添加控制任务暂停和开启:
@receiver(config_updated)def constance_updated(sender, key, old_value, new_value, **kwargs): if key == 'DEBUG': settings.DEBUG = config.DEBUG print("DEBUG已变更", settings.DEBUG)
if key == "Sign_aliyunpan": if new_value: global_var.get_value_("scheduler").resume_job("sign_aliyunpan") print("sign_aliyunpan签到已开启") else: global_var.get_value_("scheduler").pause_job("sign_aliyunpan") print("sign_aliyunpan签到已关闭")