人妖在线一区,国产日韩欧美一区二区综合在线,国产啪精品视频网站免费,欧美内射深插日本少妇

新聞動(dòng)態(tài)

Python使用signal定時(shí)結(jié)束AsyncIOScheduler任務(wù)的問題

發(fā)布日期:2022-02-21 15:28 | 文章來(lái)源:腳本之家

在使用aiohttp結(jié)合apscheduler的AsyncIOScheduler模擬定點(diǎn)并發(fā)的時(shí)候遇到兩個(gè)問題

  1. 在調(diào)度器scheduler.start()后,程序直接退出(在Jupiter中任務(wù)可以正常啟動(dòng))
  2. 如何在指定時(shí)間調(diào)用scheduler.shutdown()? (因?yàn)槌绦蛑苯油顺隽耍?br/>

原調(diào)試代碼如下:

from datetime import datetime, timedelta
import aiohttp
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
async def get(session):
 url = 'https://httpbin.org/get?a=1'
 async with session.get(url) as res:
  print('get', res.status)
  return await res.text()
async def post(session):
 url = 'https://httpbin.org/post?b=2'
 async with session.post(url) as res:
  print('post', res.status)
  return await res.text()
async def main():
 async with aiohttp.ClientSession() as session:
  await get(session)
  await post(session)
if __name__ == '__main__':
 jobstores = {'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')}
 scheduler = AsyncIOScheduler(jobstores=jobstores)
 for i in range(10):  # 添加10個(gè)任務(wù)
  job = scheduler.add_job(main, 'date', run_date=datetime.now() + timedelta(seconds=10))
 scheduler.start()

Google后發(fā)現(xiàn)AsyncIOScheduler的使用需要在scheduler啟動(dòng)后,需要自己調(diào)用asyncio.get_event_loop().run_forever()來(lái)啟動(dòng)協(xié)程任務(wù)。
但是一旦run_forever()則就會(huì)阻塞至死。除非有KeyboardInterrupt, SystemExit等異?;蛘邚?qiáng)殺來(lái)停止其運(yùn)行。
此時(shí)想到使用Python的signal來(lái)定時(shí)發(fā)送信號(hào),修改后程序如下,可以正常延遲停止(感覺有點(diǎn)像模擬Go的defer)。

# -*- coding: utf-8 -*-
"""
@Time : 2021/7/23
@Auth : hanzhichao
@Desc:
"""
from datetime import datetime, timedelta
import signal
import asyncio
import aiohttp
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
async def get(session):
 url = 'https://httpbin.org/get?a=1'
 async with session.get(url) as res:
  print('get', res.status)
  return await res.text()
async def post(session):
 url = 'https://httpbin.org/post?b=2'
 async with session.post(url) as res:
  print('post', res.status)
  return await res.text()
async def main():
 async with aiohttp.ClientSession() as session:
  await get(session)
  await post(session)
if __name__ == '__main__':
 jobstores = {'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')}
 scheduler = AsyncIOScheduler(jobstores=jobstores)
 for i in range(10):  # 添加10個(gè)任務(wù)
  job = scheduler.add_job(main, 'date', run_date=datetime.now() + timedelta(seconds=10))
 scheduler.start()
 signal.alarm(20)  # 20秒后終止程序
 asyncio.get_event_loop().run_forever()  # 永遠(yuǎn)運(yùn)行

到此這篇關(guān)于Python使用signal定時(shí)結(jié)束AsyncIOScheduler任務(wù)的文章就介紹到這了,更多相關(guān)Python定時(shí)結(jié)束AsyncIOScheduler任務(wù)內(nèi)容請(qǐng)搜索本站以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持本站!

香港服務(wù)器租用

版權(quán)聲明:本站文章來(lái)源標(biāo)注為YINGSOO的內(nèi)容版權(quán)均為本站所有,歡迎引用、轉(zhuǎn)載,請(qǐng)保持原文完整并注明來(lái)源及原文鏈接。禁止復(fù)制或仿造本網(wǎng)站,禁止在非www.sddonglingsh.com所屬的服務(wù)器上建立鏡像,否則將依法追究法律責(zé)任。本站部分內(nèi)容來(lái)源于網(wǎng)友推薦、互聯(lián)網(wǎng)收集整理而來(lái),僅供學(xué)習(xí)參考,不代表本站立場(chǎng),如有內(nèi)容涉嫌侵權(quán),請(qǐng)聯(lián)系alex-e#qq.com處理。

相關(guān)文章

實(shí)時(shí)開通

自選配置、實(shí)時(shí)開通

免備案

全球線路精選!

全天候客戶服務(wù)

7x24全年不間斷在線

專屬顧問服務(wù)

1對(duì)1客戶咨詢顧問

在線
客服

在線客服:7*24小時(shí)在線

客服
熱線

400-630-3752
7*24小時(shí)客服服務(wù)熱線

關(guān)注
微信

關(guān)注官方微信
頂部