python - 這個錯誤要怎么改
問題描述
源代碼
#!/bin/env python3# coding:utf-8'''ljk 20161116(update 20170217)This script should be put in crontab in every web server.Execute every n minutes.Collect nginx access log, process it and insert the result into mysql.'''import osimport reimport subprocessimport timeimport warningsimport pymysqlfrom sys import argv, exitfrom socket import gethostnamefrom urllib.parse import unquotefrom zlib import crc32from multiprocessing import Pool##### 自定義部分 ###### 定義日志格式,利用非貪婪匹配和分組匹配,需要嚴格參照日志定義中的分隔符和引號log_pattern = r’^(?P<remote_addr>.*?) - [(?P<time_local>.*?)] '(?P<request>.*?)'’ r’ (?P<status>.*?) (?P<body_bytes_sent>.*?) (?P<request_time>.*?)’ r’ '(?P<http_referer>.*?)' '(?P<http_user_agent>.*?)' - (?P<http_x_forwarded_for>.*)$’# request的正則,其實是由 'request_method request_uri server_protocol'三部分組成request_uri_pattern = r’^(?P<request_method>(GET|POST|HEAD|DELETE)?) (?P<request_uri>.*?) (?P<server_protocol>HTTP.*)$’# 日志目錄log_dir = ’/nginx_log/’# 要處理的站點(可隨需要想list中添加)todo = [’www’, ’news’, ’m.api’,]# MySQL相關設置mysql_host = ’xxxx’mysql_user = ’xxxx’mysql_passwd = ’xxxx’mysql_port = ’xxxx’mysql_database = ’xxxx’# 表結構creat_table = 'CREATE TABLE IF NOT EXISTS {} (id bigint unsigned NOT NULL AUTO_INCREMENT PRIMARY KEY,server char(11) NOT NULL DEFAULT ’’,uri_abs varchar(200) NOT NULL DEFAULT ’’ COMMENT ’對$uri做uridecode,然后做抽象化處理’,uri_abs_crc32 bigint unsigned NOT NULL DEFAULT ’0’ COMMENT ’對上面uri_abs字段計算crc32’,args_abs varchar(200) NOT NULL DEFAULT ’’ COMMENT ’對$args做uridecode,然后做抽象化處理’,args_abs_crc32 bigint unsigned NOT NULL DEFAULT ’0’ COMMENT ’對上面args字段計算crc32’,time_local timestamp NOT NULL DEFAULT ’0000-00-00 00:00:00’,response_code smallint NOT NULL DEFAULT ’0’,bytes_sent int NOT NULL DEFAULT ’0’ COMMENT ’發送給客戶端的響應大小’,request_time float(6,3) NOT NULL DEFAULT ’0.000’,user_ip varchar(40) NOT NULL DEFAULT ’’,cdn_ip varchar(15) NOT NULL DEFAULT ’’ COMMENT ’CDN最后節點的ip:空字串表示沒經過CDN; - 表示沒經過CDN和F5’,request_method varchar(7) NOT NULL DEFAULT ’’,uri varchar(255) NOT NULL DEFAULT ’’ COMMENT ’$uri,已做uridecode’,args varchar(255) NOT NULL DEFAULT ’’ COMMENT ’$args,已做uridecode’,referer varchar(255) NOT NULL DEFAULT ’’ COMMENT ’’,KEY time_local (time_local),KEY uri_abs_crc32 (uri_abs_crc32),KEY args_abs_crc32 (args_abs_crc32) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 row_format=compressed'##### 自定義部分結束 ###### 主機名global serverserver = gethostname()# 今天零點global today_starttoday_start = time.strftime(’%Y-%m-%d’, time.localtime()) + ’ 00:00:00’# 將pymysql對于操作中的警告信息轉為可捕捉的異常warnings.filterwarnings(’error’, category=pymysql.err.Warning)def my_connect(): '''鏈接數據庫''' global connection, con_cur try:connection = pymysql.connect(host=mysql_host, user=mysql_user, password=mysql_passwd, charset=’utf8mb4’, port=mysql_port, autocommit=True, database=mysql_database) except pymysql.err.MySQLError as err:print(’Error: ’ + str(err))exit(20) con_cur = connection.cursor()def create_table(t_name): '''創建各站點對應的表''' my_connect() try:con_cur.execute(creat_table.format(t_name)) except pymysql.err.Warning:passdef process_line(line_str): ''' 處理每一行記錄 line_str: 該行數據的字符串形式 ''' processed = log_pattern_obj.search(line_str) if not processed:’’’如果正則根本就無法匹配該行記錄時’’’print('Can’t process this line: {}'.format(line_str))return server, ’’, 0, ’’, 0, ’’, ’’, ’’, ’’, ’’, ’’ else:# remote_addr (客戶若不經過代理,則可認為用戶的真實ip)remote_addr = processed.group(’remote_addr’)# time_localtime_local = processed.group(’time_local’)# 轉換時間為mysql date類型ori_time = time.strptime(time_local.split()[0], ’%d/%b/%Y:%H:%M:%S’)new_time = time.strftime(’%Y-%m-%d %H:%M:%S’, ori_time)# 處理uri和argsrequest = processed.group(’request’)request_further = request_uri_pattern_obj.search(request)if request_further: request_method = request_further.group(’request_method’) request_uri = request_further.group(’request_uri’) uri_args = request_uri.split(’?’, 1) # 對uri和args進行urldecode uri = unquote(uri_args[0]) args = ’’ if len(uri_args) == 1 else unquote(uri_args[1]) # 對uri和args進行抽象化 uri_abs = text_abstract(uri, ’uri’) args_abs = text_abstract(args, ’args’) # 對庫里的uri_abs和args_abs字段進行crc32校驗 uri_abs_crc32 = crc32(uri_abs.encode()) args_abs_crc32 = 0 if args_abs == ’’ else crc32(args_abs.encode())else: print(’$request abnormal: {}’.format(line_str)) request_method = ’’ uri = request uri_abs = ’’ uri_abs_crc32 = 0 args = ’’ args_abs = ’’ args_abs_crc32 = 0# 狀態碼,字節數,響應時間response_code = processed.group(’status’)bytes_sent = processed.group(’body_bytes_sent’)request_time = processed.group(’request_time’)# user_ip,cdn最后節點ip,以及是否經過F5http_x_forwarded_for = processed.group(’http_x_forwarded_for’)ips = http_x_forwarded_for.split()# user_ip:用戶真實ip# cdn_ip: CDN最后節點的ip,’’表示沒經過CDN;’-’表示沒經過CDN和F5if http_x_forwarded_for == ’-’: ’’’沒經過CDN和F5’’’ user_ip = remote_addr cdn_ip = ’-’elif ips[0] == remote_addr: ’’’沒經過CDN,經過F5’’’ user_ip = remote_addr cdn_ip = ’’else: ’’’經過CDN和F5’’’ user_ip = ips[0].rstrip(’,’) cdn_ip = ips[-1]return (server, uri_abs, uri_abs_crc32, args_abs, args_abs_crc32, new_time, response_code, bytes_sent,request_time, user_ip, cdn_ip, request_method, uri, args)def text_abstract(text, what): '''進一步處理uri和args,將其做抽象化,方便對其進行歸類 如uri: /article/10.html 抽象為 /article/?.html 如args: s=你好&type=0 抽象為 s=?&type=? 規則:待處理部分由[a-zA-Z-_]組成的,則保留,其他情況值轉為’?’ ''' tmp_abs = ’’ if what == ’uri’:uri_list = [tmp for tmp in text.split(’/’) if tmp != ’’]if len(uri_list) == 0: ’’’uri為'/'的情況’’’ tmp_abs = ’/’else: for i in range(len(uri_list)):if not re.match(r’[a-zA-Z-_]+?(..*)?$’, uri_list[i]): ’’’uri不符合規則時,進行轉換’’’ if ’.’ in uri_list[i]:if not re.match(r’[a-zA-Z-_]+$’, uri_list[i].split(’.’)[0]): uri_list[i] = ’?.’ + uri_list[i].split(’.’)[1] else:uri_list[i] = ’?’ for v in uri_list:tmp_abs += ’/{}’.format(v) if text.endswith(’/’):’’’如果原uri后面有'/',要保留’’’tmp_abs += ’/’ elif what == ’args’: if text == ’’:tmp_abs = ’’ else:try: tmp_dict = OrderedDict((tmp.split(’=’) for tmp in text.split(’&’))) for k, v in tmp_dict.items():if not re.match(r’[a-zA-Z-_]+$’, v): ’’’除了value值為全字母的情況,都進行轉換’’’ tmp_dict[k] = ’?’ for k, v in tmp_dict.items():if tmp_abs == ’’: tmp_abs += ’{}={}’.format(k, v)else: tmp_abs += ’&{}={}’.format(k, v)except ValueError: ’’’參數中沒有= 或者 即沒&也沒= 會拋出ValueError’’’ tmp_abs = ’?’ return tmp_absdef insert_data(line_data, cursor, results, limit, t_name, l_name): ''' 記錄處理之后的數據,累積limit條執行一次插入 line_data:每行處理之前的字符串數據; limit:每limit行執行一次數據插入; t_name:對應的表名; l_name:日志文件名 ''' line_result = process_line(line_data) results.append(line_result) # print(’len(result):{}’.format(len(result))) #debug if len(results) == limit:insert_correct(cursor, results, t_name, l_name)results.clear()print(’{} {} 處理至 {}’.format(time.strftime(’%H:%M:%S’, time.localtime()), l_name, line_result[5]))def insert_correct(cursor, results, t_name, l_name): '''在插入數據過程中處理異常''' insert_sql = ’insert into {} (server,uri_abs,uri_abs_crc32,args_abs,args_abs_crc32,time_local,response_code,’ ’bytes_sent,request_time,user_ip,cdn_ip,request_method,uri,args) ’ ’values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)’.format(t_name) try:cursor.executemany(insert_sql, results) except pymysql.err.Warning as err:print(’n{} Warning: {}’.format(l_name, err)) except pymysql.err.MySQLError as err:print(’n{} Error: {}’.format(l_name, err))print(’插入數據時出錯...n’)connection.close()exit(10)def get_prev_num(t_name, l_name): '''取得今天已入庫的行數 t_name:表名 l_name:日志文件名''' try:con_cur.execute(’select min(id) from {0} where time_local=(’’select min(time_local) from {0} where time_local>='{1}')’.format(t_name, today_start))min_id = con_cur.fetchone()[0]if min_id is not None: # 假如有今天的數據 con_cur.execute(’select max(id) from {}’.format(t_name)) max_id = con_cur.fetchone()[0] con_cur.execute(’select count(*) from {} where id>={} and id<={} and server='{}'’.format(t_name, min_id, max_id, server)) prev_num = con_cur.fetchone()[0]else: prev_num = 0return prev_num except pymysql.err.MySQLError as err:print(’Error: {}’.format(err))print(’Error:未取得已入庫的行數,本次跳過{}n’.format(l_name))returndef del_old_data(t_name, l_name, n=3): '''刪除n天前的數據,n默認為3''' # n天前的日期間 three_days_ago = time.strftime(’%Y-%m-%d %H:%M:%S’, time.localtime(time.time() - 3600 * 24 * n)) try:con_cur.execute(’select max(id) from {0} where time_local=(’’select max(time_local) from {0} where time_local!='0000-00-00 00:00:00' and time_local<='{1}')’.format( t_name, three_days_ago))max_id = con_cur.fetchone()[0]if max_id is not None: con_cur.execute(’delete from {} where id<={}’.format(t_name, max_id)) except pymysql.err.MySQLError as err:print(’n{} Error: {}’.format(l_name, err))print(’未能刪除表{}天前的數據...n’.format(n))def main_loop(log_name): '''主邏輯 log_name:日志文件名''' table_name = log_name.split(’.access’)[0].replace(’.’, ’_’) # 將域名例如m.api轉換成m_api,因為表名中不能包含’.’ results = [] # 創建表 create_table(table_name) # 當前日志文件總行數 num = int(subprocess.run(’wc -l {}’.format(log_dir + log_name), shell=True, stdout=subprocess.PIPE, universal_newlines=True).stdout.split()[0]) print(’num: {}’.format(num)) # debug # 上一次處理到的行數 prev_num = get_prev_num(table_name, log_name) if prev_num is not None:# 根據當前行數和上次處理之后記錄的行數對比,來決定本次要處理的行數范圍i = 0with open(log_name) as fp: for line in fp:i += 1if i <= prev_num: continueelif prev_num < i <= num: insert_data(line, con_cur, results, 1000, table_name, log_name)else: break# 插入不足1000行的resultsif len(results) > 0: insert_correct(con_cur, results, table_name, log_name) del_old_data(table_name, log_name)if __name__ == '__main__': # 檢測如果當前已經有該腳本在運行,則退出 if_run = subprocess.run(’ps -ef|grep {}|grep -v grep|grep -v '/bin/sh'|wc -l’.format(argv[0]), shell=True, stdout=subprocess.PIPE).stdout if if_run.decode().strip(’n’) == ’1’:os.chdir(log_dir)logs_list = os.listdir(log_dir)logs_list = [i for i in logs_list if ’access’ in i and os.path.isfile(i) and i.split(’.access’)[0] in todo]if len(logs_list) > 0: # 并行 with Pool(len(logs_list)) as p:p.map(main_loop, logs_list)
報錯如下
multiprocessing.pool.RemoteTraceback:'''Traceback (most recent call last): File '/usr/lib/python3.5/multiprocessing/pool.py', line 119, in worker result = (True, func(*args, **kwds)) File '/usr/lib/python3.5/multiprocessing/pool.py', line 44, in mapstar return list(map(*args)) File 'log.py', line 287, in main_loop create_table(table_name) File 'log.py', line 85, in create_table my_connect() File 'log.py', line 76, in my_connect charset=’utf8mb4’, port=mysql_port, autocommit=True, database=mysql_database) File '/usr/local/lib/python3.5/dist-packages/pymysql/__init__.py', line 90, in Connect return Connection(*args, **kwargs) File '/usr/local/lib/python3.5/dist-packages/pymysql/connections.py', line 706, in __init__ self.connect() File '/usr/local/lib/python3.5/dist-packages/pymysql/connections.py', line 922, in connect self.host_info = 'socket %s:%d' % (self.host, self.port)TypeError: %d format: a number is required, not str'''The above exception was the direct cause of the following exception:Traceback (most recent call last): File 'log.py', line 324, in <module> p.map(main_loop, logs_list) File '/usr/lib/python3.5/multiprocessing/pool.py', line 260, in map return self._map_async(func, iterable, mapstar, chunksize).get() File '/usr/lib/python3.5/multiprocessing/pool.py', line 608, in get raise self._valueTypeError: %d format: a number is required, not str
py3.5.2
這個哪里錯了
問題解答
回答1:port不是int類型,mysql_port的需要填一個int而不是str類型的
相關文章:
1. php - 想要遠程推送emjio ios端怎么搞 需要怎么配合2. python - 如何把152753這個字符串轉變成時間格式15:27:533. Javascript 比較不同編碼的字符串4. mysql - 關于時間的入庫問題,大神們你們存數據庫的時間是取本地的時間,還是取utc的時間?5. python - 數據無法插入到mysql表里6. mysql - 類似于之類的通知系統如何設計數據庫7. css3 - transform的順序不同為何會使元素的形狀不同。8. mysql優化 - mysql EXPLAIN之后怎么看結果進行優化 ?9. python - 關于beautifulsoup獲取文檔內容10. python - 速度最快的啟動界面GUI
