如何用python實(shí)現(xiàn)一個(gè)HTTP連接池
首先, HTTP連接是基于TCP連接的, 與服務(wù)器之間進(jìn)行HTTP通信, 本質(zhì)就是與服務(wù)器之間建立了TCP連接后, 相互收發(fā)基于HTTP協(xié)議的數(shù)據(jù)包. 因此, 如果我們需要頻繁地去請(qǐng)求某個(gè)服務(wù)器的資源, 我們就可以一直維持與個(gè)服務(wù)器的TCP連接不斷開, 然后在需要請(qǐng)求資源的時(shí)候, 把連接拿出來用就行了.
一個(gè)項(xiàng)目可能需要與服務(wù)器之間同時(shí)保持多個(gè)連接, 比如一個(gè)爬蟲項(xiàng)目, 有的線程需要請(qǐng)求服務(wù)器的網(wǎng)頁資源, 有的線程需要請(qǐng)求服務(wù)器的圖片等資源, 而這些請(qǐng)求都可以建立在同一條TCP連接上.
因此, 我們使用一個(gè)管理器來對(duì)這些連接進(jìn)行管理, 任何程序需要使用這些連接時(shí), 向管理器申請(qǐng)就可以了, 等到用完之后再將連接返回給管理器, 以供其他程序重復(fù)使用, 這個(gè)管理器就是連接池.
基于上一章的分析, 連接池應(yīng)該是一個(gè)收納連接的容器, 同時(shí)對(duì)這些連接有管理能力:
class HTTPConnectionPool: def __init__(self, host: str, port: int = None, max_size: int = None, idle_timeout: int = None) -> None: ''' :param host: pass :param port: pass :param max_size: 同時(shí)存在的最大連接數(shù), 默認(rèn)None->連接數(shù)無限,沒了就創(chuàng)建 :param idle_timeout: 單個(gè)連接單次最長空閑時(shí)間,超時(shí)自動(dòng)關(guān)閉,默認(rèn)None->不限時(shí) ''' self.host = host self.port = port self.max_size = max_size self.idle_timeout = idle_timeout self._lock = threading.Condition() self._pool = [] # 這里的conn_num指的是總連接數(shù),包括其它線程拿出去正在使用的連接 self.conn_num = 0 self.is_closed = False def acquire(self, blocking: bool = True, timeout: int = None) -> WrapperHTTPConnection: ... def release(self, conn: WrapperHTTPConnection) -> None: ...
因此, 我們定義這樣一個(gè)HTTPConnectionPool類, 使用一個(gè)列表來保存可用的連接. 對(duì)于外部來說, 只需要調(diào)用這個(gè)連接池對(duì)象的acquire和release方法就能取得和釋放連接.
2. 線程安全地管理連接對(duì)于線程池內(nèi)部來說, 至少需要三個(gè)關(guān)于連接的操作: 從連接池中取得連接, 將連接放回連接池, 以及創(chuàng)建一個(gè)連接:
def _get_connection(self) -> WrapperHTTPConnection: # 這個(gè)方法會(huì)把連接從_idle_conn移動(dòng)到_used_conn列表中,并返回這個(gè)連接 try: return self._pool.pop() except IndexError: raise EmptyPoolErrordef _put_connection(self, conn: WrapperHTTPConnection) -> None: self._pool.append(conn)def _create_connection(self) -> WrapperHTTPConnection: self.conn_num += 1 return WrapperHTTPConnection(self, HTTPConnection(self.host, self.port))
對(duì)于連接池外部來說, 主要有申請(qǐng)連接和釋放連接這兩個(gè)操作, 實(shí)際上這就是個(gè)簡單的生產(chǎn)者消費(fèi)者模型. 考慮到外部可能是多線程的環(huán)境, 我們使用threading.Condition來保證線程安全. 關(guān)于Condition的資料可以看這里.
def acquire(self, blocking: bool = True, timeout: int = None) -> WrapperHTTPConnection: if self.is_closed: raise ConnectionPoolClosed with self._lock: if self.max_size is None or not self.is_full(): # 在還能創(chuàng)建新連接的情況下,如果沒有空閑連接,直接創(chuàng)建一個(gè)就行了 if self.is_pool_empty():self._put_connection(self._create_connection()) else: # 不能創(chuàng)建新連接的情況下,如果設(shè)置了blocking=False,沒連接就報(bào)錯(cuò) # 否則,就基于timeout進(jìn)行阻塞,直到超時(shí)或者有可用連接為止 if not blocking:if self.is_pool_empty(): raise EmptyPoolError elif timeout is None:while self.is_pool_empty(): self._lock.wait() elif timeout < 0:raise ValueError('’timeout’ must be a non-negative number') else:end_time = time.time() + timeoutwhile self.is_pool_empty(): remaining = end_time - time.time() if remaining <= 0: raise EmptyPoolError self._lock.wait(remaining) # 走到這一步了,池子里一定有空閑連接 return self._get_connection()def release(self, conn: WrapperHTTPConnection) -> None: if self.is_closed: # 如果這個(gè)連接是在連接池關(guān)閉后才釋放的,那就不用回連接池了,直接放生 conn.close() return # 實(shí)際上,python列表的append操作是線程安全的,可以不加鎖 # 這里調(diào)用鎖是為了通過notify方法通知其它正在wait的線程:現(xiàn)在有連接可用了 with self._lock: if not conn.is_available: # 如果這個(gè)連接不可用了,就應(yīng)該創(chuàng)建一個(gè)新連接放進(jìn)去,因?yàn)榭赡苓€有其它線程在等著連接用 conn.close() self.conn_num -= 1 conn = self._create_connection() self._put_connection(conn) self._lock.notify()
我們首先看看acquire方法, 這個(gè)方法其實(shí)就是在申請(qǐng)到鎖之后調(diào)用內(nèi)部的_get_connection方法獲取連接, 這樣就線程安全了. 需要注意的是, 如果當(dāng)前的條件無法獲取連接, 就會(huì)調(diào)用條件變量的wait方法, 及時(shí)釋放鎖并阻塞住當(dāng)前線程. 然后, 當(dāng)其它線程作為生產(chǎn)者調(diào)用release方法釋放連接時(shí), 會(huì)觸發(fā)條件變量的notify方法, 從而喚醒一個(gè)阻塞在wait階段的線程, 即消費(fèi)者. 這個(gè)消費(fèi)者再從池中取出剛放回去的線程, 這樣整個(gè)生產(chǎn)者消費(fèi)者模型就運(yùn)轉(zhuǎn)起來了.
3. 上下文管理器對(duì)于一個(gè)程序來說, 它使用連接池的形式是獲取連接->使用連接->釋放連接. 因此, 我們應(yīng)該通過with語句來管理這個(gè)連接, 以免在程序的最后遺漏掉釋放連接這一步驟.
基于這個(gè)原因, 我們通過一個(gè)WrapperHTTPConnection類來對(duì)HTTPConnection進(jìn)行封裝, 以實(shí)現(xiàn)上下文管理器的功能. HTTPConnection的代碼可以看《用python實(shí)現(xiàn)一個(gè)HTTP客戶端》這篇文章.
class WrapperHTTPConnection: def __init__(self, pool: ’HTTPConnectionPool’, conn: HTTPConnection) -> None: self.pool = pool self.conn = conn self.response = None self.is_available = True def __enter__(self) -> ’WrapperHTTPConnection’: return self def __exit__(self, *exit_info: Any) -> None: # 如果response沒讀完并且連接需要復(fù)用,就棄用這個(gè)連接 if not self.response.will_close and not self.response.is_closed(): self.close() self.pool.release(self) def request(self, *args: Any, **kwargs: Any) -> HTTPResponse: self.conn.request(*args, **kwargs) self.response = self.conn.get_response() return self.response def close(self) -> None: self.conn.close() self.is_available = False
同樣的, 連接池可能也需要關(guān)閉, 因此我們給連接池也加上上下文管理器的功能:
class HTTPConnectionPool: ... def close(self) -> None: if self.is_closed: return self.is_closed = True pool, self._pool = self._pool, None for conn in pool: conn.close() def __enter__(self) -> ’HTTPConnectionPool’: return self def __exit__(self, *exit_info: Any) -> None: self.close()
這樣, 我們就可以通過with語句優(yōu)雅地管理連接池了:
with HTTPConnectionPool(**kwargs) as pool: with pool.acquire() as conn: res = conn.request(’GET’, ’/’) ...4. 定時(shí)清理連接
如果一個(gè)連接池的所需連接數(shù)是隨時(shí)間變化的, 那么就會(huì)出現(xiàn)一種情況: 在高峰期, 我們創(chuàng)建了非常多的連接, 然后進(jìn)入低谷期之后, 連接過剩, 大量的連接處于空閑狀態(tài), 浪費(fèi)資源. 因此, 我們可以設(shè)置一個(gè)定時(shí)任務(wù), 定期清理空閑時(shí)間過長的連接, 減少連接池的資源占用.
首先, 我們需要為連接對(duì)象添加一個(gè)last_time屬性, 每當(dāng)連接釋放進(jìn)入連接池后, 就修改這個(gè)屬性的值為當(dāng)前時(shí)間, 這樣我們就能明確知道, 連接池內(nèi)的每個(gè)空閑連接空閑了多久:
class WrapperHTTPConnection: ... def __init__(self, pool: ’HTTPConnectionPool’, conn: HTTPConnection) -> None: ... self.last_time = Noneclass HTTPConnectionPool: ... def _put_connection(self, conn: WrapperHTTPConnection) -> None: conn.last_time = time.time() self._pool.append(conn)
然后, 我們通過threading.Timer來實(shí)現(xiàn)一個(gè)定時(shí)任務(wù):
def start_clear_conn(self) -> None: if self.idle_timeout is None: # 如果空閑連接的超時(shí)時(shí)間為無限,那么就不應(yīng)該清理連接 return self.clear_idle_conn() self._clearer = threading.Timer(self.idle_timeout, self.start_clear_conn) self._clearer.start()def stop_clear_conn(self) -> None: if self._clearer is not None: self._clearer.cancel()
threading.Timer只會(huì)執(zhí)行一次定時(shí)任務(wù), 因此, 我們需要在start_clear_conn中不斷地把自己設(shè)置為定時(shí)任務(wù). 這其實(shí)等同于新開了一個(gè)線程來執(zhí)行start_clear_conn方法, 因此并不會(huì)出現(xiàn)遞歸過深問題. 不過需要注意的是, threading.Timer雖然不會(huì)阻塞當(dāng)前線程, 但是卻會(huì)阻止當(dāng)前線程結(jié)束, 就算把它設(shè)置為守護(hù)線程都不行, 唯一可行的辦法就是調(diào)用stop_clear_conn方法取消這個(gè)定時(shí)任務(wù).
最后, 我們定義clear_idle_conn方法來清理閑置時(shí)間超時(shí)的連接:
def clear_idle_conn(self) -> None: if self.is_closed: raise ConnectionPoolClosed # 這里開一個(gè)新線程來清理空閑連接,避免了阻塞主線程導(dǎo)致的定時(shí)精度出錯(cuò) threading.Thread(target=self._clear_idle_conn).start()def _clear_idle_conn(self) -> None: if not self._lock.acquire(timeout=self.idle_timeout): # 因?yàn)槭敲扛魋elf.idle_timeout秒檢查一次 # 如果過了self.idle_timeout秒還沒申請(qǐng)到鎖,下一次都開始了,本次也就不用繼續(xù)了 return current_time = time.time() if self.is_pool_empty(): pass elif current_time - self._pool[-1].last_time >= self.idle_timeout: # 這里處理下面的二分法沒法處理的邊界情況,即所有連接都閑置超時(shí)的情況 self.conn_num -= len(self._pool) self._pool.clear() else: # 通過二分法找出從左往右第一個(gè)不超時(shí)的連接的指針 left, right = 0, len(self._pool) - 1 while left < right: mid = (left + right) // 2 if current_time - self._pool[mid].last_time >= self.idle_timeout:left = mid + 1 else:right = mid self._pool = self._pool[left:] self.conn_num -= left self._lock.release()
由于我們獲取和釋放連接都是從self._pool的尾部開始操作的, 因此self._pool這個(gè)容器是一個(gè)先進(jìn)后出隊(duì)列, 它里面放著的連接, 一定是越靠近頭部的閑置時(shí)間越長, 從頭到尾閑置時(shí)間依次遞減. 基于這個(gè)原因, 我們使用二分法來找出列表中第一個(gè)沒有閑置超時(shí)的連接, 然后把在它之前的連接一次性刪除, 這樣就能達(dá)到O(logN)的時(shí)間復(fù)雜度, 算是一種比較高效的方法. 需要注意的是, 如果連接池內(nèi)所有的連接都是超時(shí)的, 那么這種方法是刪不干凈的, 需要對(duì)這種邊界情況單獨(dú)處理.
三. 總結(jié)1. 完整代碼及分析這個(gè)連接池的完整代碼如下:
import threadingimport timefrom typing import Anyfrom client import HTTPConnection, HTTPResponseclass WrapperHTTPConnection: def __init__(self, pool: ’HTTPConnectionPool’, conn: HTTPConnection) -> None: self.pool = pool self.conn = conn self.response = None self.last_time = time.time() self.is_available = True def __enter__(self) -> ’WrapperHTTPConnection’: return self def __exit__(self, *exit_info: Any) -> None: # 如果response沒讀完并且連接需要復(fù)用,就棄用這個(gè)連接 if not self.response.will_close and not self.response.is_closed(): self.close() self.pool.release(self) def request(self, *args: Any, **kwargs: Any) -> HTTPResponse: self.conn.request(*args, **kwargs) self.response = self.conn.get_response() return self.response def close(self) -> None: self.conn.close() self.is_available = Falseclass HTTPConnectionPool: def __init__(self, host: str, port: int = None, max_size: int = None, idle_timeout: int = None) -> None: ''' :param host: pass :param port: pass :param max_size: 同時(shí)存在的最大連接數(shù), 默認(rèn)None->連接數(shù)無限,沒了就創(chuàng)建 :param idle_timeout: 單個(gè)連接單次最長空閑時(shí)間,超時(shí)自動(dòng)關(guān)閉,默認(rèn)None->不限時(shí) ''' self.host = host self.port = port self.max_size = max_size self.idle_timeout = idle_timeout self._lock = threading.Condition() self._pool = [] # 這里的conn_num指的是總連接數(shù),包括其它線程拿出去正在使用的連接 self.conn_num = 0 self.is_closed = False self._clearer = None self.start_clear_conn() def acquire(self, blocking: bool = True, timeout: int = None) -> WrapperHTTPConnection: if self.is_closed: raise ConnectionPoolClosed with self._lock: if self.max_size is None or not self.is_full():# 在還能創(chuàng)建新連接的情況下,如果沒有空閑連接,直接創(chuàng)建一個(gè)就行了if self.is_pool_empty(): self._put_connection(self._create_connection()) else:# 不能創(chuàng)建新連接的情況下,如果設(shè)置了blocking=False,沒連接就報(bào)錯(cuò)# 否則,就基于timeout進(jìn)行阻塞,直到超時(shí)或者有可用連接為止if not blocking: if self.is_pool_empty(): raise EmptyPoolErrorelif timeout is None: while self.is_pool_empty(): self._lock.wait()elif timeout < 0: raise ValueError('’timeout’ must be a non-negative number')else: end_time = time.time() + timeout while self.is_pool_empty(): remaining = end_time - time.time() if remaining <= 0: raise EmptyPoolError self._lock.wait(remaining) # 走到這一步了,池子里一定有空閑連接 return self._get_connection() def release(self, conn: WrapperHTTPConnection) -> None: if self.is_closed: # 如果這個(gè)連接是在連接池關(guān)閉后才釋放的,那就不用回連接池了,直接放生 conn.close() return # 實(shí)際上,python列表的append操作是線程安全的,可以不加鎖 # 這里調(diào)用鎖是為了通過notify方法通知其它正在wait的線程:現(xiàn)在有連接可用了 with self._lock: if not conn.is_available:# 如果這個(gè)連接不可用了,就應(yīng)該創(chuàng)建一個(gè)新連接放進(jìn)去,因?yàn)榭赡苓€有其它線程在等著連接用conn.close()self.conn_num -= 1conn = self._create_connection() self._put_connection(conn) self._lock.notify() def _get_connection(self) -> WrapperHTTPConnection: # 這個(gè)方法會(huì)把連接從_idle_conn移動(dòng)到_used_conn列表中,并返回這個(gè)連接 try: return self._pool.pop() except IndexError: raise EmptyPoolError def _put_connection(self, conn: WrapperHTTPConnection) -> None: conn.last_time = time.time() self._pool.append(conn) def _create_connection(self) -> WrapperHTTPConnection: self.conn_num += 1 return WrapperHTTPConnection(self, HTTPConnection(self.host, self.port)) def is_pool_empty(self) -> bool: # 這里指的是,空閑可用的連接是否為空 return len(self._pool) == 0 def is_full(self) -> bool: if self.max_size is None: return False return self.conn_num >= self.max_size def close(self) -> None: if self.is_closed: return self.is_closed = True self.stop_clear_conn() pool, self._pool = self._pool, None for conn in pool: conn.close() def clear_idle_conn(self) -> None: if self.is_closed: raise ConnectionPoolClosed # 這里開一個(gè)新線程來清理空閑連接,避免了阻塞主線程導(dǎo)致的定時(shí)精度出錯(cuò) threading.Thread(target=self._clear_idle_conn).start() def _clear_idle_conn(self) -> None: if not self._lock.acquire(timeout=self.idle_timeout): # 因?yàn)槭敲扛魋elf.idle_timeout秒檢查一次 # 如果過了self.idle_timeout秒還沒申請(qǐng)到鎖,下一次都開始了,本次也就不用繼續(xù)了 return current_time = time.time() if self.is_pool_empty(): pass elif current_time - self._pool[-1].last_time >= self.idle_timeout: # 這里處理下面的二分法沒法處理的邊界情況,即所有連接都閑置超時(shí)的情況 self.conn_num -= len(self._pool) self._pool.clear() else: # 通過二分法找出從左往右第一個(gè)不超時(shí)的連接的指針 left, right = 0, len(self._pool) - 1 while left < right:mid = (left + right) // 2if current_time - self._pool[mid].last_time >= self.idle_timeout: left = mid + 1else: right = mid self._pool = self._pool[left:] self.conn_num -= left self._lock.release() def start_clear_conn(self) -> None: if self.idle_timeout is None: # 如果空閑連接的超時(shí)時(shí)間為無限,那么就不應(yīng)該清理連接 return self.clear_idle_conn() self._clearer = threading.Timer(self.idle_timeout, self.start_clear_conn) self._clearer.start() def stop_clear_conn(self) -> None: if self._clearer is not None: self._clearer.cancel() def __enter__(self) -> ’HTTPConnectionPool’: return self def __exit__(self, *exit_info: Any) -> None: self.close()class EmptyPoolError(Exception): passclass ConnectionPoolClosed(Exception): pass
首先, 這個(gè)連接池的核心就是對(duì)連接進(jìn)行管理, 而這包含取出連接和釋放連接兩個(gè)過程. 因此這東西的本質(zhì)就是一個(gè)生產(chǎn)者消費(fèi)者模型, 取出線程時(shí)是消費(fèi)者, 放入線程時(shí)是生產(chǎn)者, 使用threading自帶的Condition對(duì)象就能完美解決線程安全問題, 使二者協(xié)同合作.
解決獲取連接和釋放連接這個(gè)問題之后, 其實(shí)這個(gè)連接池就已經(jīng)能用了. 但是如果涉及到更多細(xì)節(jié)方面的東西, 比如判斷連接是否可用, 自動(dòng)釋放連接, 清理閑置連接等等, 就需要對(duì)這個(gè)連接進(jìn)行封裝, 為它添加更多的屬性和方法, 這就引入了WrapperHTTPConnection這個(gè)類. 實(shí)現(xiàn)它的__enter___和__exit__方法之后, 就能使用上下文管理器來自動(dòng)釋放連接. 至于清理閑置連接, 通過last_time屬性記錄每個(gè)連接的最后釋放時(shí)間, 然后在連接池中添加一個(gè)定時(shí)任務(wù)就行了.
以上就是如何用python實(shí)現(xiàn)一個(gè)HTTP連接池的詳細(xì)內(nèi)容,更多關(guān)于python 實(shí)現(xiàn)一個(gè)HTTP連接池的資料請(qǐng)關(guān)注好吧啦網(wǎng)其它相關(guān)文章!
相關(guān)文章:
1. 告別AJAX實(shí)現(xiàn)無刷新提交表單2. XML解析錯(cuò)誤:未組織好 的解決辦法3. CSS3使用過度動(dòng)畫和緩動(dòng)效果案例講解4. ASP不能打開注冊(cè)表關(guān)鍵字錯(cuò)誤 "80004005"的解決方法5. 讀大數(shù)據(jù)量的XML文件的讀取問題6. IE6/IE7/IE8/IE9中tbody的innerHTML不能賦值的完美解決方案7. 詳解CSS偽元素的妙用單標(biāo)簽之美8. ASP實(shí)現(xiàn)加法驗(yàn)證碼9. 三個(gè)不常見的 HTML5 實(shí)用新特性簡介10. XHTML 1.0:標(biāo)記新的開端
