1. 使用time,threading 模块
- 第一种:
代码实现:
# -*- coding:utf-8 -*-
import threading
import time
cancel_tmr = false
def start():
#具体任务执行内容
print("hello world")
def heart_beat():
# 打印当前时间
print(time.strftime('%y-%m-%d %h:%m:%s'))
if not cancel_tmr:
start()
# 每隔3秒执行一次
threading.timer(3, heart_beat).start()
if __name__ == '__main__':
heart_beat()
# 15秒后停止定时器
time.sleep(15)
cancel_tmr = true
- 第二种:
# -*- coding:utf-8 -*-
import threading
import time
exec_count = 0
def start():
print("hello world", exec_count)
def heart_beat():
print(time.strftime('%y-%m-%d %h:%m:%s'))
global exec_count
exec_count = 1
# 执行15次后停止定时器
if exec_count < 15:
start()
threading.timer(5, heart_beat).start()
if __name__ == '__main__':
heart_beat()
2. 使用datetime,threading 模块
- 要求:每天凌晨3点执行func方法。
代码实现:
# -*- coding:utf-8 -*-
import datetime
import threading
def func():
print("haha")
# 如果需要循环调用,就要添加以下方法
timer = threading.timer(86400, func)
timer.start()
# 获取现在时间
now_time = datetime.datetime.now()
# 获取明天时间
next_time = now_time datetime.timedelta(days= 1)
next_year = next_time.date().year
next_month = next_time.date().month
next_day = next_time.date().day
# print(next_time, next_year, next_month, next_day)
# 获取明天3点时间
next_time = datetime.datetime.strptime(str(next_year) "-" str(next_month) "-" str(next_day) " 03:00:00", "%y-%m-%d %h:%m:%s")
# print(next_time)
# # 获取昨天时间
# last_time = now_time datetime.timedelta(days=-1)
# 获取距离明天3点时间,单位为秒
timer_start_time = (next_time - now_time).total_seconds()
print(timer_start_time)
# 54186.75975
# 定时器,参数为(多少时间后执行,单位为秒,执行的方法)
timer = threading.timer(timer_start_time, func)
timer.start()
3. 使用time,schedule 模块 (执行单个任务)
- schedule:一个轻量级的定时任务调度的库。
代码实现:
# !/usr/bin/env python
# -*- coding:utf-8 -*-
import schedule
import time
def job(text=""):
print(text, "i'm working...")
schedule.every().seconds.do(job, "每秒一次")
schedule.every(5).seconds.do(job, "五秒一次")
schedule.every(10).minutes.do(job, "10分钟一次")
schedule.every().hour.do(job, "1小时一次")
# 每天10:30执行
schedule.every().day.at("10:30").do(job)
# 每隔5到10天执行一次任务
schedule.every(5).to(10).days.do(job)
# 每周一的这个时候执行一次任务
schedule.every().monday.do(job)
# 每周三13:15执行一次任务
schedule.every().wednesday.at("13:15").do(job)
while true:
# run_pending:运行所有可以运行的任务
schedule.run_pending()
time.sleep(1)
4. 使用time,schedule 模块 (执行多个任务)
时间有冲突的执行多个任务
代码实现:
# -*- coding:utf-8 -*-
import schedule
import time
def job():
print("i'm working... in job1 start")
time.sleep(5)
print("i'm working... in job1 end")
def job2():
print("i'm working... in job2")
schedule.every(3).seconds.do(job)
schedule.every(3).seconds.do(job2)
while true:
schedule.run_pending()
time.sleep(1)
运行结果:
- 根据代码运行结果可以看出,schedule方法是串行的,代码中有时间冲突,所以执行完第一个任务,才会执行第二个任务,即任务二每隔3秒执行一次,而任务一执行时间是5秒。
- 如果不用多线程,则定时任务会不准确,因为任务会按照顺序执行,如果上一个任务比较耗时,则下一个任务就会"延误"
多线程并发运行多个任务
代码实现:
# -*- coding:utf-8 -*-
import datetime
import schedule
import threading
import time
def job1():
print("i'm working for job1 start", datetime.datetime.now())
time.sleep(5)
print("job1: end", datetime.datetime.now())
def job2():
print("i'm working for job2 start", datetime.datetime.now())
time.sleep(3)
print("job2: end", datetime.datetime.now())
def job1_task():
threading.thread(target=job1).start()
def job2_task():
threading.thread(target=job2).start()
def run():
schedule.every(3).seconds.do(job1_task)
schedule.every(3).seconds.do(job2_task)
while true:
schedule.run_pending()
time.sleep(1)
if __name__ == '__main__':
run()
代码运行结果:
5. 使用apscheduler 模块
apscheduler 模块详情介绍
apscheduler 模块详情介绍
代码实现
# -*- coding:utf-8 -*-
import time
import datetime
from apscheduler.schedulers.blocking import blockingscheduler
def my_job(text="默认值"):
print(text, time.strftime('%y-%m-%d %h:%m:%s', time.localtime(time.time())))
sched = blockingscheduler()
sched.add_job(my_job, 'interval', seconds=3, args=['3秒定时'])
# 2018-3-17 00:00:00 执行一次,args传递一个text参数
sched.add_job(my_job, 'date', run_date=datetime.date(2019, 10, 17), args=['根据年月日定时执行'])
# 2018-3-17 13:46:00 执行一次,args传递一个text参数
sched.add_job(my_job, 'date', run_date=datetime.datetime(2019, 10, 17, 14, 10, 0), args=['根据年月日时分秒定时执行'])
# sched.start()
"""
interval 间隔调度,参数如下:
weeks (int) – 间隔几周
days (int) – 间隔几天
hours (int) – 间隔几小时
minutes (int) – 间隔几分钟
seconds (int) – 间隔多少秒
start_date (datetime|str) – 开始日期
end_date (datetime|str) – 结束日期
timezone (datetime.tzinfo|str) – 时区
"""
"""
cron参数如下:
year (int|str) – 年,4位数字
month (int|str) – 月 (范围1-12)
day (int|str) – 日 (范围1-31)
week (int|str) – 周 (范围1-53)
day_of_week (int|str) – 周内第几天或者星期几 (范围0-6 或者 mon,tue,wed,thu,fri,sat,sun)
hour (int|str) – 时 (范围0-23)
minute (int|str) – 分 (范围0-59)
second (int|str) – 秒 (范围0-59)
start_date (datetime|str) – 最早开始日期(包含)
end_date (datetime|str) – 最晚结束时间(包含)
timezone (datetime.tzinfo|str) – 指定时区
"""
# my_job将会在6,7,8,11,12月的第3个周五的1,2,3点运行
sched.add_job(my_job, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')
# 截止到2018-12-30 00:00:00,每周一到周五早上五点半运行job_function
sched.add_job(my_job, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2018-12-31')
# 表示2017年3月22日17时19分07秒执行该程序
sched.add_job(my_job, 'cron', year=2017, month=3, day=22, hour=17, minute=19, second=7)
# 表示任务在6,7,8,11,12月份的第三个星期五的00:00,01:00,02:00,03:00 执行该程序
sched.add_job(my_job, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')
# 表示从星期一到星期五5:30(am)直到2014-05-30 00:00:00
sched.add_job(my_job, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2014-05-30')
# 表示每5秒执行该程序一次,相当于interval 间隔调度中seconds = 5
sched.add_job(my_job, 'cron', second='*/5', args=['5秒定时'])
sched.start()