python更加靈活的Logger日志詳解
用到的4個類:
1、Logger:
打印日志用的對象;
設(shè)置日志等級,添加移除handler,添加移除filter,設(shè)置下級Logger,使用各種方法打印日志;
創(chuàng)建方式有兩種,使用logging.getLogger("mylog")和創(chuàng)建實例logging.Logger("mylog");
推薦使用getLogger()的方式,不傳name參數(shù)的獲得的Logger為rootLogger,傳name參數(shù)的上級為rootLogger;
直接實例化的Logger沒有上級,日志等級為NOTSET,并且用該實例創(chuàng)建的下級Logger的上級直接為rootLogger;
使用getLogger(name)傳入相同的名字獲得是同一個Logger;
Logger擁有上下級關(guān)系,父子記錄器名稱之間用“.”分割,Logger擁有有效日志等級概念(getEffectiveLevel()方法返回的是日志等級整數(shù)值),直接新建實例創(chuàng)建的Logger日志等級為NOTSET,默認的根記錄器為WARNING等級,如果NOTSET等級的Logger為根記錄器則處理所有消息,否則會委托給父級記錄器,遍歷父記錄器鏈,直到找到非NOTSET的父記錄器并把該等級作為自己的有效等級,如果直到根記錄器也是NOTSET則處理所有消息;
2、Formatter:
日志打印格式;
創(chuàng)建方式為logging.Formatter(fmt=None, datefmt=None, style='%', validate=True)
fmt:格式化日志
datefmt:格式化fmt中的%(asctime)s
style:在3.2版本添加
validate:在3.8版本添加,不正確或不匹配的樣式和fmt將引發(fā)ValueError;
跟上一篇一致python的logging日志模塊
3、Filter:
過濾日志;
通過繼承l(wèi)ogging.Filter類,并實現(xiàn) filter(self, record: LogRecord) -> int方法,返回0或者False表示不通過,返回非0或者True表示可以通過;(from logging import LogRecord)
LogRecord可以操作的屬性大概有這么多:
同一個Handler可以添加多個Filter,依次過濾,當前Filter通過后傳遞給下一個Filter;
4、Handler:
日志輸出方式;
設(shè)置日志等級,設(shè)置formatter,添加移除filter;
Logger可以添加多個handler,將日志按不同格式和過濾送往不同的地方,同一個Logger只會添加同一個handler一次,多次添加無影響;
子Logger處理完自己的handler后,會將日志傳遞給父Logger的handler處理,依次向上傳遞,不要將同一個handler同時添加到父子Logger里,否則父子Logger都會處理會打印多次相同日志;
可以通過設(shè)置Logger對象的propagate屬性為False關(guān)閉傳遞給父Logger的handler;
大概有這么多Handler:
(from logging import handlers)
StreamHandler:
構(gòu)造方法 logging.StreamHandler(stream=None)
將日志發(fā)送到像sys.stdout、sys.stderr或類似文件的對象中(支持write()和flush()方法的對象),默認使用sys.stderr,3.2版本后還有個terminator屬性,可以設(shè)置終止符(默認“\n”);
FileHandler:
構(gòu)造方法logging.FileHandler(filename, mode='a', encoding=None, delay=False)
將日志記錄到文件中,默認文件將無限增長,如果delay為true,打開文件會延遲到第一次調(diào)用emit();
3.6版本開始,pathlib.Path也可以作為filename的值
NullHandler:
不做任何格式化和輸出,提供給庫開發(fā)人員使用;
WatchedFileHandler:
這是一個FileHandler,監(jiān)控正在記錄日志的文件,如果文件變動了則關(guān)閉文件重新打開;(windows系統(tǒng)不適用)
BaseRotatingHandler:
構(gòu)造方法logging.handlers.BaseRotatingHandler(filename, mode, encoding=None, delay=False)
是RotatingFileHandler和TimedRotatingFileHandler的基類,不需要實例化該類;
RotatingFileHandler:
構(gòu)造方法logging.handlers.RotatingFileHandler(filename, mode='a', maxBytes=0, backupCount=0, encoding=None, delay=False)
按照日志文件大小和備份日志文件數(shù)量保存日志到文件;
maxBytes或者backupCount為0則不會滾動日志,當日志大小接近maxBytes時會用“.1”“.2”“.3”...后綴保存舊文件,并保持舊文件數(shù)量不超過backupCount,當前日志一直是沒有后綴的那個文件;
TimedRotatingFileHandler:
構(gòu)造方法logging.handlers.TimedRotatingFileHandler(filename, when='h', interval=1, backupCount=0, encoding=None, delay=False, utc=False, atTime=None)
按照時間間隔和備份文件數(shù)量保存日志到文件;
when:“S”秒,“M”分鐘,“H”小時,“D”天,“W0”-“W6”工作日(W0為周一),“midnight”午夜(不指定atTime的時候午夜12點,否則按照atTime的時間滾動)
interval:時間間隔;
backupCount:保留文件數(shù)量上限;
utc:為true時文件名后綴使用UTC時間否則使用本地時間;
atTime:3.4添加,datetime.time類型,指定一天中發(fā)生滾動日志的時間,只有when為“midnight”或者“W0”-“W6”時有效;
生成的備份文件文件名后綴格式為%Y-%m-%d_%H-%M-%S,第一次計算滾動時間(程序啟動后)時會使用現(xiàn)有日志的最后修改時間或者當前時間計算;
SocketHandler:
構(gòu)造方法logging.handlers.SocketHandler(host, port)
將日志發(fā)送到網(wǎng)絡(luò)套接字,基類使用的是TCP;
發(fā)送的二進制bytes是由編碼的,由“內(nèi)容長度+內(nèi)容”組成,內(nèi)容是用pickle序列化的一個dict(LogRecord,可以用logging.makeLogRecord(attrdict)轉(zhuǎn)換),長度是用struct將序列化后的內(nèi)容打包的一個大端無符號long數(shù)值,具體邏輯在SocketHandler類源碼的makePickle()方法中有體現(xiàn):
搞了半天才搞好的小demo(之前沒看到編碼方式還以為socket的編碼,服務(wù)端接收數(shù)據(jù)解不出):
#!/usr/bin/env python3 # coding=utf-8 import logging from logging import handlers import time log = logging.getLogger("mylog") log.setLevel(logging.DEBUG) h = logging.handlers.SocketHandler(host="127.0.0.1", port=8899) f = logging.Formatter("[%(name)s][%(asctime)s]%(message)s") h.setFormatter(f) log.addHandler(h) for x in range(1, 6): log.info("test log %d" % x) time.sleep(0.5)
服務(wù)端demo:
#!/usr/bin/env python3 # coding=utf-8 import socket import pickle import struct import logging def unPickle(bs: bytes): data_len_bytes = bs[0:4] data_len = struct.unpack(">L", data_len_bytes)[0] pickled_data = bs[4: data_len + 4 + 1] return pickle.loads(pickled_data) socket_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) socket_server.bind(("127.0.0.1", 8899)) socket_server.listen(3) while True: try: client, client_addr = socket_server.accept() while True: data = client.recv(1024 * 10) if data != b'': log_dict = unPickle(data) log_record = logging.makeLogRecord(log_dict) print("recv log:", log_record) except Exception as e: print("Exception:", e)
先運行socket服務(wù),再測日志,因為連接不到服務(wù)日志會被丟棄,運行結(jié)果:
DatagramHandler:
構(gòu)造方法logging.handlers.DatagramHandler(host, port)
繼承了SocketHandler,使用UDP發(fā)送,類似SocketHandler的使用;
SysLogHandler:
構(gòu)造方法logging.handlers.SysLogHandler(address=('localhost', SYSLOG_UDP_PORT), facility=LOG_USER, socktype=socket.SOCK_DGRAM)
將日志發(fā)送到遠程或者本地的Unix系統(tǒng)日志,未指定address則使用('localhost', 514)地址;
NTEventLogHandler:
構(gòu)造方法logging.handlers.NTEventLogHandler(appname, dllname=None, logtype='Application')
將日志發(fā)送到本地的WindowsNT、Windows 2000、WindowsXP系統(tǒng)日志,使用時需要Mark Hammond's Win32的python擴展;
SMTPHandler:
構(gòu)造方法logging.handlers.SMTPHandler(mailhost, fromaddr, toaddrs, subject, credentials=None, secure=None, timeout=1.0)
將日志發(fā)送到電子郵件;
mailhost使用(host, port)元組,toaddrs是一個字符串列表,credentials可以用(username, password)元組指定用戶名密碼
MemoryHandler:
構(gòu)造方法logging.handlers.MemoryHandler(capacity, flushLevel=ERROR, target=None, flushOnClose=True)
支持在內(nèi)存中緩沖日志,當緩存將滿或者出現(xiàn)某種嚴重情況時把日志發(fā)送到目標handler;
是BufferingHandler的子類;
每當向緩沖區(qū)添加日志的時候都會調(diào)用shouldFlush()判斷是否需要刷新,如果需要則會調(diào)用flush()刷新;
HTTPHandler:
構(gòu)造方法logging.handlers.HTTPHandler(host, url, method='GET', secure=False, credentials=None, context=None)
通過GET或者POST向一個web服務(wù)器發(fā)送日志;
如果要指定端口,host可以使用host:port值;
secure為true,則使用HTTPS;
對HTTPHandler使用setFormatter()是無效的,HTTPHandler沒有調(diào)用format()格式化,而是調(diào)用了mapLogRecord()方法然后使用urllib.parse.urlencode()編碼的;
mapLogRecord()函數(shù)很簡單(有需要可以重寫):
而LogRecord()里是沒有asctime字段的,所以log.asctime是錯誤的,但是logRecord里有created和msecs字段表時間:
小小的demo:
#!/usr/bin/env python3 # coding=utf-8 import logging from logging import handlers import time log = logging.getLogger("mylog") log.setLevel(logging.DEBUG) h_get = handlers.HTTPHandler(host="127.0.0.1:8080", url="test_http_log", method="GET", secure=False, credentials=None, context=None) log.addHandler(h_get) h_post = handlers.HTTPHandler(host="127.0.0.1:8080", url="test_http_log", method="POST", secure=False, credentials=None, context=None) log.addHandler(h_post) h = logging.StreamHandler() f = logging.Formatter("[%(levelname)s][%(asctime)s]%(message)s") h.setFormatter(f) log.addHandler(h) for x in range(1, 3): log.info("test HTTP log %d" % x) time.sleep(0.5)
http服務(wù)端:
#!/usr/bin/env python3 # coding=utf-8 from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler from urllib import parse import logging import time class MyHttpServer(BaseHTTPRequestHandler): def do_GET(self): url = parse.urlparse(self.path) q = parse.parse_qs(url.query) log = logging.makeLogRecord(q) log_created = time.localtime(float(log.created[0])) format_time = time.strftime("%Y/%m/%d %H:%M:%S", log_created) print("-GET:", log.name, log.levelname, log.msg, format_time+","+str(int(float(log.msecs[0])//1))) self.send_response(200) self.end_headers() self.wfile.flush() def do_POST(self): length = int(self.headers["content-length"]) data = self.rfile.read(length) data = data.decode(encoding="utf-8") data_dict = parse.parse_qs(data) data = logging.makeLogRecord(data_dict) print("=POST:", data) self.send_response(200) self.end_headers() self.wfile.flush() httpserver = ThreadingHTTPServer(("127.0.0.1", 8080), MyHttpServer) httpserver.serve_forever()
先運行服務(wù)端再跑日志demo,運行結(jié)果:
QueueHandler/QueueListener:
構(gòu)造方法logging.handlers.QueueHandler(queue)
logging.handlers.QueueListener(queue, *handlers, respect_handler_level=False)
將日志發(fā)送到隊列,用于處理隊列或者多線程模塊情況;
queue可以是類隊列的任何對象(原樣傳給dequeue()函數(shù)),也可以用queue.SimpleQueue代替queue;
QueueHandler的小demo:
#!/usr/bin/env python3 # coding=utf-8 import logging from logging import handlers from logging import LogRecord import queue class MyLogHandler(logging.Handler): def handle(self, record: LogRecord) -> None: print(record) log = logging.getLogger("mylog") log.setLevel(logging.DEBUG) q = queue.SimpleQueue() h = handlers.QueueHandler(q) f = logging.Formatter("[%(levelname)s]%(message)s") h.setFormatter(f) log.addHandler(h) listener = handlers.QueueListener(q, MyLogHandler(), respect_handler_level=False) listener.start() print("listener started") log.info("test queue log1 info") log.error("test queue log2 error") listener.stop() print("listener stopted") log.info("test queue log3 info") log.error("test queue log4 error") print("test end")
執(zhí)行結(jié)果:
常用Demo:
#!/usr/bin/env python3 # coding=utf-8 import logging from logging import LogRecord from logging import handlers import sys # log1 = logging.Logger("a", logging.DEBUG) log1 = logging.getLogger("a") _log1 = logging.getLogger("a") print(log1 is _log1, id(log1), id(_log1)) log1.setLevel(logging.DEBUG) log2 = log1.getChild("b") log3 = log1.getChild("c") print(log1) print(log2) print(log3) h1 = logging.StreamHandler() format1 = logging.Formatter("[H1] [%(levelname)-8s] %(message)s") h1.setFormatter(format1) log1.addHandler(h1) h2 = logging.StreamHandler() format2 = logging.Formatter(fmt="[H2][%(levelname)7s][%(asctime)s]%(message)s", datefmt="%c") h2.setFormatter(format2) class myFilter(logging.Filter): def filter(self, record: LogRecord) -> int: print("record:" + repr(record)) if "HELLO" in record.msg: return True else: return False h2.addFilter(myFilter()) log2.addHandler(h2) # log2.addFilter(myFilter()) h3 = logging.StreamHandler() format3 = logging.Formatter("[H3]{%(levelname)s}{%(name)s}%(message)s") h3.setFormatter(format3) h3.setStream(sys.stdout) h3.setLevel(logging.INFO) log3.addHandler(h3) # h3_file = logging.FileHandler(filename="test_Logging2.log", mode="w") h3_file.setFormatter(logging.Formatter("[H3File][%(levelname)8s][%(asctime)s]%(message)s")) log3.addHandler(h3_file) # h3_rotatingfile = handlers.RotatingFileHandler(filename="test_Logging2_rotating.log", mode="a", maxBytes=256, backupCount=3, encoding="utf-8", delay=False) h3_rotatingfile.setFormatter(logging.Formatter("[h3_rotating][%(levelname)s] %(message)s")) log3.addHandler(h3_rotatingfile) # h3_timedrotationgfile = handlers.TimedRotatingFileHandler(filename="test_Logging2_timedrotating.log", when="S", interval=5, backupCount=5, encoding="utf-8", delay=False, utc=False, # atTime= ) h3_timedrotationgfile.setFormatter(logging.Formatter("[H3timed][%(levelname)s]%(message)s")) log3.addHandler(h3_timedrotationgfile) log3.propagate = False log1.debug("test log1") log1.warning("warn log1") log2.debug("test log2 HELLO") log2.info("info log2") log2.warning("warn log2 HELLO") log2.error("error log2") log3.debug("test log3") log3.warning("warn log3")
多次執(zhí)行結(jié)果中的一次:
生成的日志文件:
參考:
logging — Logging facility for Python — Python 3.8.12 documentation
logging.handlers — Logging handlers — Python 3.8.12 documentation
Python3 日志(內(nèi)置logging模塊) - 天馬行宇 - 博客園
到此這篇關(guān)于python更加靈活的Logger日志的文章就介紹到這了,更多相關(guān)python更加靈活的Logger日志內(nèi)容請搜索本站以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持本站!
版權(quán)聲明:本站文章來源標注為YINGSOO的內(nèi)容版權(quán)均為本站所有,歡迎引用、轉(zhuǎn)載,請保持原文完整并注明來源及原文鏈接。禁止復制或仿造本網(wǎng)站,禁止在非www.sddonglingsh.com所屬的服務(wù)器上建立鏡像,否則將依法追究法律責任。本站部分內(nèi)容來源于網(wǎng)友推薦、互聯(lián)網(wǎng)收集整理而來,僅供學習參考,不代表本站立場,如有內(nèi)容涉嫌侵權(quán),請聯(lián)系alex-e#qq.com處理。