使用numpy實現(xiàn)topk函數(shù)操作(并排序)
np.argpartition 難以解決topK
topK是常用的一個功能,在python中,numpy等計算庫使用了豐富的底層優(yōu)化,對于矩陣計算的效率遠(yuǎn)高于python的for-loop實現(xiàn)。因此,我們希望盡量用一些numpy函數(shù)的組合實現(xiàn)topK。
pytorch 庫提供了topk函數(shù),可以將高維數(shù)組沿某一維度(該維度共N項),選出最大(最?。┑腒項并排序。返回排序結(jié)果和index信息。奇怪的是,更輕量級的numpy庫并沒有直接提供 topK 函數(shù)。numpy只提供了argpartition 和 partition,可以將最大(最小)的K項排到前K位。以argpartition為例,最小的3項排到了前3位:
>>> x = np.array([3, 5, 6, 4, 2, 7, 1]) >>> x[np.argpartition(x, 3)] array([2, 1, 3, 4, 5, 7, 6])
注意,argpartition實現(xiàn)的是 partial sorting,如上例,前3項和其余項被分開,但是兩部分各自都是不排序的!而我們可能更想要topK的幾項排好序(其余項則不作要求)。因此,下面提供一種基于argpartition的topK方法。
一個naive方法
最簡單的方法自然是全排序,然后取前K項。缺點在于,要把topK之外的數(shù)據(jù)也進(jìn)行排序,當(dāng)K << N時較為浪費時間,復(fù)雜度為O ( n log n ) O(n \log n)O(nlogn):
def naive_arg_topK(matrix, K, axis=0): """ perform topK based on np.argsort :param matrix: to be sorted :param K: select and sort the top K items :param axis: dimension to be sorted. :return: """ full_sort = np.argsort(matrix, axis=axis) return full_sort.take(np.arange(K), axis=axis) # Example >>> dists = np.random.permutation(np.arange(30)).reshape(6, 5) array([[17, 28, 1, 24, 23, 8], [ 9, 21, 3, 22, 4, 5], [19, 12, 26, 11, 13, 27], [10, 15, 18, 14, 7, 16], [ 0, 25, 29, 2, 6, 20]]) >>> naive_arg_topK(dists, 2, axis=0) array([[4, 2, 0, 4, 1, 1], [1, 3, 1, 2, 4, 0]]) >>> naive_arg_topK(dists, 2, axis=1) array([[2, 5], [2, 4], [3, 1], [4, 0], [0, 3]])
基于partition的方法
對于 np.argpartition 函數(shù),復(fù)雜度可能下降到 O ( n log K ) O(n \log K)O(nlogK),很多情況下,K << N,此時naive方法有優(yōu)化的空間。
以下方法首先選出 topK 項,然后僅對前topK項進(jìn)行排序(matrix僅限2d-array)。
def partition_arg_topK(matrix, K, axis=0): """ perform topK based on np.argpartition :param matrix: to be sorted :param K: select and sort the top K items :param axis: 0 or 1. dimension to be sorted. :return: """ a_part = np.argpartition(matrix, K, axis=axis) if axis == 0: row_index = np.arange(matrix.shape[1 - axis]) a_sec_argsort_K = np.argsort(matrix[a_part[0:K, :], row_index], axis=axis) return a_part[0:K, :][a_sec_argsort_K, row_index] else: column_index = np.arange(matrix.shape[1 - axis])[:, None] a_sec_argsort_K = np.argsort(matrix[column_index, a_part[:, 0:K]], axis=axis) return a_part[:, 0:K][column_index, a_sec_argsort_K] # Example >>> dists = np.random.permutation(np.arange(30)).reshape(6, 5) array([[17, 28, 1, 24, 23, 8], [ 9, 21, 3, 22, 4, 5], [19, 12, 26, 11, 13, 27], [10, 15, 18, 14, 7, 16], [ 0, 25, 29, 2, 6, 20]]) >>> partition_arg_topK(dists, 2, axis=0) array([[4, 2, 0, 4, 1, 1], [1, 3, 1, 2, 4, 0]]) >>> partition_arg_topK(dists, 2, axis=1) array([[2, 5], [2, 4], [3, 1], [4, 0], [0, 3]])
大數(shù)據(jù)量測試
對shape(5000, 100000)的矩陣進(jìn)行topK排序,測試時間為:
K | partition(s) | naive(s) |
---|---|---|
10 | 8.884 | 22.604 |
100 | 9.012 | 22.458 |
1000 | 8.904 | 22.506 |
5000 | 11.305 | 22.844 |
補充:python堆排序?qū)崿F(xiàn)TOPK問題
# 構(gòu)建小頂堆跳轉(zhuǎn)def sift(li, low, higt): tmp = li[low] i = low j = 2 * i + 1 while j <= higt: # 情況2:i已經(jīng)是最后一層 if j + 1 <= higt and li[j + 1] < li[j]: # 右孩子存在并且小于左孩子 j += 1 if tmp > li[j]: li[i] = li[j] i = j j = 2 * i + 1 else: break # 情況1:j位置比tmp小 li[i] = tmp def top_k(li, k): heap = li[0:k] # 建堆 for i in range(k // 2 - 1, -1, -1): sift(heap, i, k - 1) for i in range(k, len(li)): if li[i] > heap[0]: heap[0] = li[i] sift(heap, 0, k - 1) # 挨個輸出 for i in range(k - 1, -1, -1): heap[0], heap[i] = heap[i], heap[0] sift(heap, 0, i - 1) return heap li = [0, 8, 6, 2, 4, 9, 1, 4, 6] print(top_k(li, 3))
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持本站。
版權(quán)聲明:本站文章來源標(biāo)注為YINGSOO的內(nèi)容版權(quán)均為本站所有,歡迎引用、轉(zhuǎn)載,請保持原文完整并注明來源及原文鏈接。禁止復(fù)制或仿造本網(wǎng)站,禁止在非www.sddonglingsh.com所屬的服務(wù)器上建立鏡像,否則將依法追究法律責(zé)任。本站部分內(nèi)容來源于網(wǎng)友推薦、互聯(lián)網(wǎng)收集整理而來,僅供學(xué)習(xí)參考,不代表本站立場,如有內(nèi)容涉嫌侵權(quán),請聯(lián)系alex-e#qq.com處理。