人妖在线一区,国产日韩欧美一区二区综合在线,国产啪精品视频网站免费,欧美内射深插日本少妇

新聞動態(tài)

MySQL單表千萬級數(shù)據(jù)處理的思路分享

發(fā)布日期:2022-02-11 10:39 | 文章來源:站長之家

項目背景

在處理過程中,今天上午需要更新A字段,下午爬蟲組完成了規(guī)格書或圖片的爬取又需要更新圖片和規(guī)格書字段,由于單表千萬級深度翻頁會導致處理速度越來越慢。

select a,b,c from db.tb limit 10000 offset 9000000

但是時間是有限的,是否有更好的方法去解決這種問題呢?

改進思路

是否有可以不需要深度翻頁也可以進行數(shù)據(jù)更新的憑據(jù)?
是的,利用自增id列

觀察數(shù)據(jù)特征

此單表有自增id列且為主鍵,根據(jù)索引列查詢數(shù)據(jù)和更新數(shù)據(jù)是最理想的途徑。

select a,b, c from db.tb where id=9999999;
update db.tb set a=x where id=9999999;

多進程處理

每個進程處理一定id范圍內的數(shù)據(jù),這樣既避免的深度翻頁又可以同時多進程處理數(shù)據(jù)。
提高數(shù)據(jù)查詢速度的同時也提高了數(shù)據(jù)處理速度。
下面是我編寫的任務分配函數(shù),供參考:

def mission_handler(all_missions, worker_mission_size):
    """
    根據(jù)總任務數(shù)和每個worker的任務數(shù)計算出任務列表, 任務列表元素為(任務開始id, 任務結束id)。
    例: 總任務數(shù)100個,每個worker的任務數(shù)40, 那么任務列表為:[(1, 40), (41, 80), (81, 100)]
    :param all_missions: 總任務數(shù)
    :param worker_mission_size: 每個worker的最大任務數(shù)
    :return: [(start_id, end_id), (start_id, end_id), ...]
    """
    worker_mission_ids = []
    current_id = 0
    while current_id <= all_missions:
        start_id = all_missions if current_id + 1 >= all_missions else current_id + 1
        end_id = all_missions if current_id + worker_mission_size >= all_missions else current_id + worker_mission_size
        if start_id == end_id:
            if worker_mission_ids[-1][1] == start_id:
                break
        worker_mission_ids.append((start_id, end_id))
        current_id += worker_mission_size
    return worker_mission_ids

假設單表id最大值為100, 然后我們希望每個進程處理20個id,那么任務列表將為:

>>> mission_handler(100, 40)
[(1, 40), (41, 80), (81, 100)]

那么,
進程1將只需要處理id between 1 to 40的數(shù)據(jù);
進程2將只需要處理id between 41 to 80的數(shù)據(jù);
進程3將只需要處理id between 81 to 100的數(shù)據(jù)。

from concurrent.futures import ProcessPoolExecutor

def main():
    # 自增id最大值
    max_id = 30000000
    # 單worker處理數(shù)據(jù)量
    worker_mission_size = 1000000
    # 使用多進程進行處理
    missions = mission_handler(max_id, worker_mission_size)
    workers = []
    executor = ProcessPoolExecutor()
    for idx, mission in enumerate(missions):
        start_id, end_id = mission
        workers.append(executor.submit(data_handler, start_id, end_id, idx))

def data_handler(start_id, end_id, worker_id):
    pass

思路總結

  1. 避免深度翻頁進而使用自增id進行查詢數(shù)據(jù)和數(shù)據(jù)
  2. 使用多進程處理數(shù)據(jù)

數(shù)據(jù)處理技巧

記錄處理成功與處理失敗的數(shù)據(jù)id,以便后續(xù)跟進處理

# 用另外一張表記錄處理狀態(tài)
insert into db.tb_handle_status(row_id, success) values (999, 0);

循環(huán)體內進行異常捕獲,避免程序異常退出

def data_handler(start_id, end_id, worker_id):
    # 數(shù)據(jù)連接
    conn, cursor = mysql()
    current_id = start_id
        try:
            while current_id <= end_id:
                try:
                    # TODO 數(shù)據(jù)處理代碼
                    pass
                except Exception as e:
                    # TODO 記錄處理結果
                    # 數(shù)據(jù)移動到下一條
                    current_id += 1
                    continue
                else:
                    # 無異常,繼續(xù)處理下一條數(shù)據(jù)
                    current_id += 1
        except Exception as e:
            return 'worker_id({}): result({})'.format(worker_id, False)
        finally:
            # 數(shù)據(jù)庫資源釋放
            cursor.close()
            conn.close()
        return 'worker_id({}): result({})'.format(worker_id, True)

更新數(shù)據(jù)庫數(shù)據(jù)盡量使用批量提交

sql = """update db.tb set a=%s, b=%s where id=%s"""
values = [
            ('a_value', 'b_value', 9999),
            ('a_value', 'b_value', 9998),
            ...
         ]
# 批量提交,減少網(wǎng)絡io以及鎖獲取頻率
cursor.executemany(sql, values)

以上就是MySQL單表千萬級數(shù)據(jù)處理的思路分享的詳細內容,更多關于MySQL單表千萬級數(shù)據(jù)處理的資料請關注本站其它相關文章!

國外服務器租用

版權聲明:本站文章來源標注為YINGSOO的內容版權均為本站所有,歡迎引用、轉載,請保持原文完整并注明來源及原文鏈接。禁止復制或仿造本網(wǎng)站,禁止在非www.sddonglingsh.com所屬的服務器上建立鏡像,否則將依法追究法律責任。本站部分內容來源于網(wǎng)友推薦、互聯(lián)網(wǎng)收集整理而來,僅供學習參考,不代表本站立場,如有內容涉嫌侵權,請聯(lián)系alex-e#qq.com處理。

實時開通

自選配置、實時開通

免備案

全球線路精選!

全天候客戶服務

7x24全年不間斷在線

專屬顧問服務

1對1客戶咨詢顧問

在線
客服

在線客服:7*24小時在線

客服
熱線

400-630-3752
7*24小時客服服務熱線

關注
微信

關注官方微信
頂部