Запланировать повторяющееся событие в Python 3

Я пытаюсь запланировать повторяющееся событие для запуска каждую минуту в Python 3.

Я видел класс sched.scheduler, но мне интересно, есть ли другой способ сделать это. Я слышал упоминания, что мог бы использовать для этого несколько потоков, что я бы не прочь сделать.

Я в основном запрашиваю JSON, а затем анализирую его; его значение меняется со временем.

Чтобы использовать sched.scheduler, я должен создать цикл, чтобы запросить его, чтобы запланировать запуск даже на один час:

scheduler = sched.scheduler(time.time, time.sleep)

# Schedule the event. THIS IS UGLY!
for i in range(60):
    scheduler.enter(3600 * i, 1, query_rate_limit, ())

scheduler.run()

Какие еще есть способы сделать это?


person Humphrey Bogart    schedule 08.03.2010    source источник
comment
Дублирование всех вопросов о расписании для Python 2. Все из них: stackoverflow.com/search?q=%5Bpython %5D+расписание   -  person S.Lott    schedule 08.03.2010
comment
Дубликат: stackoverflow. ком/вопросы/373335/   -  person S.Lott    schedule 08.03.2010
comment
Также связано: stackoverflow.com/q/474528/3701431   -  person Sergiy Kolodyazhnyy    schedule 26.05.2018


Ответы (11)


Вы можете использовать threading.Timer, но это также запланирует разовое событие, аналогично методу .enter объектов планировщика.

Обычный шаблон (на любом языке) для преобразования одноразового планировщика в периодический планировщик состоит в том, чтобы каждое событие перепланировалось через указанный интервал. Например, с sched я бы не использовал цикл, как вы, а что-то вроде:

def periodic(scheduler, interval, action, actionargs=()):
    scheduler.enter(interval, 1, periodic,
                    (scheduler, interval, action, actionargs))
    action(*actionargs)

и инициировать весь «вечно периодический график» с помощью звонка

periodic(scheduler, 3600, query_rate_limit)

Или я мог бы использовать threading.Timer вместо scheduler.enter, но схема очень похожа.

Если вам нужно более точное изменение (например, остановить периодическое изменение расписания в определенное время или при определенных условиях), это не так уж сложно реализовать с помощью нескольких дополнительных параметров.

person Alex Martelli    schedule 08.03.2010
comment
Ну, в java у меня есть timer.scheduleAtFixedRate() и настоящая многопоточность. И все говорят, что на питоне мы пишем меньше кода... М-м-м... Просто говорю... - person user1685095; 13.12.2013
comment
@user1685095 user1685095 всегда есть исключения из любого такого обобщенного утверждения, к сожалению. - person Ponkadoodle; 14.01.2014
comment
@Wallacoloo означает ли это, что не всегда есть исключения? :) - person Rob Grant; 14.06.2016
comment
@ user1685095 не так быстро! Попробуйте кодировать это без множественного импорта, расширив TimerTask, предоставив метод run и добавив отдельный класс для запуска таймера, а также создав все эти объекты. Вероятно, около 15 строк кода. (Если у вас нет более чистого способа сделать это, я не лучший в Java.) - person jdk1.0; 22.04.2017
comment
Основываясь на ответе Алекса Мартелли, я реализовал версию декоратора, которую легче интегрировать. stackoverflow.com/a/48758861/482899 - person northtree; 13.02.2018

Вы можете использовать расписание. Он работает на Python 2.7 и 3.3 и довольно легковесен:

import schedule
import time

def job():
   print("I'm working...")

schedule.every(10).minutes.do(job)
schedule.every().hour.do(job)
schedule.every().day.at("10:30").do(job)

while 1:
   schedule.run_pending()
   time.sleep(1)
person dbader    schedule 28.05.2013
comment
почему цикл while? Разве он не будет работать как задания cron? - person Jaydev; 15.09.2016
comment
@Jaydev цикл while требуется, если код выполняется в основном потоке - person lurscher; 27.03.2018
comment
веб-сервер перестанет обслуживать файл с этим пакетом - person Florent; 19.01.2020

Мой скромный взгляд на тему:

from threading import Timer

class RepeatedTimer(object):
    def __init__(self, interval, function, *args, **kwargs):
        self._timer     = None
        self.function   = function
        self.interval   = interval
        self.args       = args
        self.kwargs     = kwargs
        self.is_running = False
        self.start()

    def _run(self):
        self.is_running = False
        self.start()
        self.function(*self.args, **self.kwargs)

    def start(self):
        if not self.is_running:
            self._timer = Timer(self.interval, self._run)
            self._timer.start()
            self.is_running = True

    def stop(self):
        self._timer.cancel()
        self.is_running = False

Использование:

from time import sleep

def hello(name):
    print "Hello %s!" % name

print "starting..."
rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start()
try:
    sleep(5) # your long-running job goes here...
finally:
    rt.stop() # better in a try/finally block to make sure the program ends!

Функции:

  • Только стандартная библиотека, никаких внешних зависимостей
  • Использует шаблон, предложенный Алексом Мартнелли.
  • start() и stop() безопасно вызывать несколько раз, даже если таймер уже запущен/остановлен
  • вызываемая функция может иметь позиционные и именованные аргументы
  • Вы можете изменить interval в любое время, это вступит в силу после следующего запуска. То же самое для args, kwargs и даже function!
person MestreLion    schedule 31.10.2012
comment
Красивый класс, но есть небольшая проблема, если start() выполняется в цикле. Он может пройти проверку is_running из-за того, что функция _run выполняется в другом потоке. Таким образом, последний self._timer переназначается, и его нельзя остановить. Проверьте мой ответ для правильной версии. - person fdb; 20.09.2013
comment
@fdb: я не уверен, что понял вашу точку зрения. Если вы выполните start() в цикле, используя тот же экземпляр класса, он ничего не сделает. Если вы создадите новый экземпляр, он запустит другой таймер (что позволит вам иметь несколько одновременных таймеров). Что касается многопоточности, да, за исключением того, что каждый start() (или __init__() должен вызываться в одном потоке - person MestreLion; 22.09.2013
comment
Это моя ошибка со словом цикла: я имею в виду быстрый вызов (реализованный с помощью цикла do...loop) функции start(). Достаточно быстро, чтобы быть быстрее, чем установка флага is_running функцией _run(). - person fdb; 30.09.2013

Основываясь на ответе MestreLion, он решает небольшую проблему с многопоточностью:

from threading import Timer, Lock


class Periodic(object):
    """
    A periodic task running in threading.Timers
    """

    def __init__(self, interval, function, *args, **kwargs):
        self._lock = Lock()
        self._timer = None
        self.function = function
        self.interval = interval
        self.args = args
        self.kwargs = kwargs
        self._stopped = True
        if kwargs.pop('autostart', True):
            self.start()

    def start(self, from_run=False):
        self._lock.acquire()
        if from_run or self._stopped:
            self._stopped = False
            self._timer = Timer(self.interval, self._run)
            self._timer.start()
            self._lock.release()

    def _run(self):
        self.start(from_run=True)
        self.function(*self.args, **self.kwargs)

    def stop(self):
        self._lock.acquire()
        self._stopped = True
        self._timer.cancel()
        self._lock.release()
person fdb    schedule 19.09.2013
comment
Ха я просто ставил замок в оригинал. Это действительно необходимо. Спасибо, это правильная версия MestreLion. - person Bob Denny; 30.11.2018

Вы можете использовать Расширенный планировщик Python. У него даже есть cron-подобный интерфейс.

person jordixou    schedule 09.11.2012

Используйте Celery.

from celery.task import PeriodicTask
from datetime import timedelta


class ProcessClicksTask(PeriodicTask):
    run_every = timedelta(minutes=30)

    def run(self, **kwargs):
        #do something
person user    schedule 22.09.2013

Основываясь на ответе Алекса Мартелли, я реализовал версию decorator, которую легче интегрировать.

import sched
import time
import datetime
from functools import wraps
from threading import Thread


def async(func):
    @wraps(func)
    def async_func(*args, **kwargs):
        func_hl = Thread(target=func, args=args, kwargs=kwargs)
        func_hl.start()
        return func_hl
    return async_func


def schedule(interval):
    def decorator(func):
        def periodic(scheduler, interval, action, actionargs=()):
            scheduler.enter(interval, 1, periodic,
                            (scheduler, interval, action, actionargs))
            action(*actionargs)

        @wraps(func)
        def wrap(*args, **kwargs):
            scheduler = sched.scheduler(time.time, time.sleep)
            periodic(scheduler, interval, func)
            scheduler.run()
        return wrap
    return decorator


@async
@schedule(1)
def periodic_event():
    print(datetime.datetime.now())


if __name__ == '__main__':
    print('start')
    periodic_event()
    print('end')
person northtree    schedule 13.02.2018
comment
Это декораторское решение действительно отличное, но у меня есть предложение по небольшому улучшению: добавьте *args и **kwargs к запланированному вызову функции следующим образом: def decorator(func, *args, **kwargs): def periodic(scheduler, interval, action, actionargs=(), kwargs={}): scheduler.enter(interval, 1, periodic, (scheduler, interval, action, actionargs, kwargs)) action(*actionargs, **kwargs) и ниже periodic(scheduler, interval, func, args, kwargs) Это позволяет планировать функции с параметрами. - person opt12; 25.05.2019
comment
@ opt12 Изменяются ли параметры для каждого расписания? - person northtree; 25.05.2019
comment
Нет, аргументы одинаковы для каждого расписания, но вы можете настроить любую функцию так, чтобы она выполнялась регулярно. Независимо от того, принимают они параметры или нет. Эти аргументы одинаковы для каждого запуска, но, по крайней мере, они могут быть предоставлены в качестве начального аргумента. - person opt12; 26.05.2019

Вот быстрый и грязный неблокирующий цикл с Thread:

#!/usr/bin/env python3
import threading,time

def worker():
    print(time.time())
    time.sleep(5)
    t = threading.Thread(target=worker)
    t.start()


threads = []
t = threading.Thread(target=worker)
threads.append(t)
t.start()
time.sleep(7)
print("Hello World")

Ничего особенного, worker сам создает новый поток с задержкой. Может быть, не самый эффективный, но достаточно простой. ответ северного дерева будет подходящим вариантом, если вам нужно более сложное решение.

И на основе этого мы можем сделать то же самое, только с Timer:

#!/usr/bin/env python3
import threading,time

def hello():
    t = threading.Timer(10.0, hello)
    t.start()
    print( "hello, world",time.time() )

t = threading.Timer(10.0, hello)
t.start()
time.sleep(12)
print("Oh,hai",time.time())
time.sleep(4)
print("How's it going?",time.time())
person Sergiy Kolodyazhnyy    schedule 25.05.2018
comment
у вас есть идеи, как добавить проверку, чтобы проверить, жив ли этот поток? Я попытался использовать блок try, кроме блока, и поместить t.start() внутри попытки, но он не запускает поток!. Мне просто нужно запустить isAlive() для t, чтобы проверить, жив ли он. - person toing_toing; 11.02.2019
comment
@toing_toing Честно говоря, я не знаю. Я бы предложил обновлять глобальную переменную потоком или файлом блокировки, если вы находитесь в среде Linux, но это идея, пришедшая мне в голову, и она не основана на передовых методах, а только на знакомстве с Linux. Для такой задачи также можно использовать общую память. Теперь я не очень хорошо разбираюсь в многопоточном программировании, поэтому я бы предложил задать вопрос на сайте и сослаться на этот ответ, чтобы люди могли видеть, с каким кодом вы пытаетесь иметь дело. - person Sergiy Kolodyazhnyy; 12.02.2019

Документ: Расширенный планировщик Python

@sched.cron_schedule(day='last sun')
def some_decorated_task():
    print("I am printed at 00:00:00 on the last Sunday of every month!")

Доступные поля:

| Field       | Description                                                    |
|-------------|----------------------------------------------------------------|
| year        | 4-digit year number                                            |
| month       | month number (1-12)                                            |
| day         | day of the month (1-31)                                        |
| week        | ISO week number (1-53)                                         |
| day_of_week | number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun) |
| hour        | hour (0-23)                                                    |
| minute      | minute (0-59)                                                  |
| second      | second (0-59)                                                  |
person Milovan Tomašević    schedule 21.12.2020

Есть новый пакет под названием ischedule. В этом случае решение может быть следующим:

from ischedule import schedule, run_loop
from datetime import timedelta


def query_rate_limit():
    print("query_rate_limit")

schedule(query_rate_limit, interval=60)
run_loop(return_after=timedelta(hours=1))

Все выполняется в основном потоке, и внутри цикла run_loop нет занятого ожидания. Время запуска очень точное, обычно в пределах доли миллисекунды от указанного времени.

person user23952    schedule 07.06.2021

См. мой образец

import sched, time

def myTask(m,n):
  print n+' '+m

def periodic_queue(interval,func,args=(),priority=1):
  s = sched.scheduler(time.time, time.sleep)
  periodic_task(s,interval,func,args,priority)
  s.run()

def periodic_task(scheduler,interval,func,args,priority):
  func(*args)
  scheduler.enter(interval,priority,periodic_task,
                   (scheduler,interval,func,args,priority))

periodic_queue(1,myTask,('world','hello'))
person Vladimir Avdeev    schedule 04.05.2018
comment
Можете ли вы объяснить, почему это лучше, чем другие 8 ответов, которые уже здесь? - person Stephen Rauch; 05.05.2018