概述:Python
[TOC]
Numpy 1 2 import numpy as npnp.__version__
创建随机值及数组 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 a = np.random.randint(10 , size=(3 , 3 )) print (np.array([1 ,2 ,3 ]))print (np.array([(1 ,2 ,3 ),(4 ,5 ,6 )]))print (np.zeros((3 ,3 )))print (np.ones((2 ,3 ,4 )))print (np.arange(5 ))print (np.arange(6 ).reshape(2 ,3 ))print (np.eye(3 ,3 ))print (np.linspace(1 , 10 , num=6 ))print (np.random.rand(2 ,3 ))print (np.random.randint(5 , size=(2 ,3 )))print (np.fromfunction(lambda i, j: i + j, (3 , 3 )))A = np.array([[1 ,2 ],[3 ,4 ]]) B = np.array([[5 ,6 ],[7 ,8 ]]) print (np.random.random((3 , 2 )))print (np.random.rand())print (np.random.rand(5 ))print (np.random.rand(2 ,3 ))
运算及转换 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 print (np.dot(A, B))print (np.mat(A) * np.mat(B))print (A.T)print (np.linalg.inv(A))print (np.sin(np.array([10 ,20 ,30 ,40 ,50 ])))print (np.exp(a))print (np.sqrt(a))print (np.power(a, 3 ))a.reshape(2 , 3 ) a.resize(2 , 3 ) a.ravel() print (np.vstack((a, b)))print (np.hstack((a, b)))print (np.hsplit(a, 3 ))print (np.vsplit(a, 3 ))print (np.max (a, axis=0 ))print (np.min (a, axis=1 ))print (np.argmax(a, axis=0 ))print (np.median(a, axis=0 ))print (np.mean(a, axis=1 ))print (np.average(a, axis=0 ))print (np.var(a, axis=1 ))print (np.std(a, axis=0 ))print (np.random.randn())print (np.random.randn(2 ,3 ))print (np.random.standard_normal(2 ))print (np.random.standard_normal((2 ,3 )))print (np.random.randint(2 ))print (np.random.randint(2 ,size=5 ))print (np.random.randint(2 ,6 ,size=5 ))print (np.random.choice(2 ,2 ))print (np.random.choice(np.array(['a' ,'b' ,'c' ,'f' ]),(2 ,3 )))list1 = [1 ,2 ,3 ,4 ,5 ] np.random.shuffle(list1) print (list1)arr = np.arange(9 ).reshape(3 ,3 ) print (arr)np.random.shuffle(arr) print (arr)print (np.random.permutation(5 ))list1 = [1 ,2 ,3 ,4 ] np.random.permutation(list1) print (list1)
pandas 读取/写入文件 1 2 3 4 5 6 7 8 9 pd.DataFrame.from_csv("csv_file" ) pd.read_csv("csv_file" ) pd.read_excel("excel_file" ) df.to_csv("data.csv" , sep="," , index=False )
DF相关信息 1 2 3 4 5 6 7 8 9 10 11 12 13 df.describe() df.info() df.index df.columns df = df.set_index('trade_date' )
处理缺失值 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 df.dropna(axis=0 ,how='all' ) df=df.fillna(value=0 ) df['a' ] = df['a' ].fillna(df['a' ].means()) df['a' ] = df['a' ].fillna(df['a' ].median()) df['a' ] = df['a' ].fillna(stats.mode(df['a' ])[0 ][0 ]) from scipy.stats import statsdf['a' ] = df['a' ].fillna(method='pad' ) from scipy.stats import statsdf['a' ] = df['a' ].fillna(method='bfill' ) pd.isnull(df.open ) df = df.replace(" " ,np.NaN)
操作 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 df.drop('open' , axis=1 ) del data['open' ]df.drop(df.index[[0 , 2 ]], inplace=True ) pd.to_numeric(df["open" ], errors='coerce' ) df.as_matrix() df.loc['2019-11-07' ,'open' ] data = data.cumsum()
合并数据集 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 df1 = pd.DataFrame(np.ones((3 ,4 ))*0 , columns=['a' ,'b' ,'c' ,'d' ]) df2 = pd.DataFrame(np.ones((3 ,4 ))*1 , columns=['a' ,'b' ,'c' ,'d' ]) df3 = pd.DataFrame(np.ones((3 ,4 ))*2 , columns=['a' ,'b' ,'c' ,'d' ]) res = pd.concat([df1, df2, df3], axis=0 ) res = pd.concat([df1, df2, df3], axis=0 , ignore_index=True ) df1 = pd.DataFrame(np.ones((3 ,4 ))*0 , columns=['a' ,'b' ,'c' ,'d' ], index=[1 ,2 ,3 ]) df2 = pd.DataFrame(np.ones((3 ,4 ))*1 , columns=['b' ,'c' ,'d' ,'e' ], index=[2 ,3 ,4 ]) res = pd.concat([df1, df2], axis=0 , join='outer' ) df1 = pd.DataFrame(np.ones((3 ,4 ))*0 , columns=['a' ,'b' ,'c' ,'d' ]) df2 = pd.DataFrame(np.ones((3 ,4 ))*1 , columns=['a' ,'b' ,'c' ,'d' ]) res = df1.append(df2, ignore_index=True ) df1 = pd.DataFrame(np.ones((3 ,4 ))*0 , columns=['a' ,'b' ,'c' ,'d' ]) df2 = pd.DataFrame(np.ones((3 ,4 ))*1 , columns=['a' ,'b' ,'c' ,'d' ]) df3 = pd.DataFrame(np.ones((3 ,4 ))*1 , columns=['a' ,'b' ,'c' ,'d' ]) res = df1.append([df2, df3], ignore_index=True )
数据统计 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 print (df.describe())print (df.count())print (df.min ())print (df.max ())print (df['date' ][df['open' ].idxmin()])print (df['open' ].idxmax())print (df.quantile())print (df.quantile(0.75 ))print (df.sum ())print (df.mean())print (df.mean())print (df.mad())print (df.var())print (df.std())print (df.skew())print (df.kurt())print (df.cumsum())print (df.cummin())print (df.cumax())print (df.cumprod())print (df.diff())print (df.pct_change())df.columns = ["f1" ,"f2" ,"f3" ,"f4" ] pd.util.testing.makeDataFrame().head(10 ) (1 )复制表格 (2 )执行pd.read_clipboard df.info reader = pd.read_csv("http://..." ,chunksize=10 ) pd.set_option("display.max_columns" , None ) pd.set_option("display.precision" ,1 ) df.style.bar("age" ,vmin=0 ) df.style.hide_index() df.style.highlight_max("label" ) df.head(n) df = df.sort_values("open" ,ascending = False ) //对具体某列进行排序,其中ascending=False 表示降序,而True 为升序,其默认ascending为升序 df = df[df["open" ]>5555 ]
可视化 使用matplotlib和numpy配合
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import pandas as pdimport matplotlib.pyplot as pltdata = pd.Series(np.random.randn(1000 ),index=np.arange(1000 )) data.plot() plt.show() data = pd.DataFrame(np.random.randn(1000 ,4 ),index=np.arange(1000 ),columns=list ("ABCD" )) data.plot() plt.show() ax = data.plot.scatter(x='A' , y='B' , color='DarkBlue' , label="Class 1(A/B)" ) bx = data.plot.scatter(x='A' , y='C' , color='LightBlue' , label="Class 2(A/C)" ,ax=ax) data.plot.scatter(x='A' , y='D' , color='LightGreen' , label='Class 3(A/D)' , ax=bx) plt.show()
在Python中一切皆为对象,所有变量被赋值后均遵循对象引用机制,在运行时需要再内存中开辟一个空间,计算完成后将结果输出至永久性存储器,当数据量过大时易出现OOM(out of memory),即内存爆炸。
当一个对象不再被调用时,当该对象的引用计数指针数为0,需要被回收,可以通过以下命令查看指定变量被引用的次数:
1 2 3 4 import sysa = 123 print (sys.getrefcount(a))
手动释放内存:
1 2 3 import gcgc.collect() show_memory_info('collected' )
当两个局部变量循环引用时,即当a与b互相引用时,即使函数完成后其引用数也不为0,对于该情况时可以采取人工回收。
垃圾回收 计数引用 当一个对象不再调用的时候,也就是当这个对象的引用计数(指针数)为 0 的时候,说明这个对象永不可达,自然它也就成为了垃圾,需要被回收。可以简单的理解为没有任何变量再指向它。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 import osimport psutildef show_memory_info (hint ): pid = os.getpid() p = psutil.Process(pid) info = p.memory_full_info() memory = info.uss / 1024. / 1024 print ('{} memory used: {} MB' .format (hint, memory))def func (): show_memory_info('initial' ) a = [i for i in range (10000000 )] show_memory_info('after a created' ) func() show_memory_info('finished' ) initial memory used: 47.19140625 MB after a created memory used: 433.91015625 MB finished memory used: 48.109375 MB
可以看到调用函数 func(),在列表 a 被创建之后,内存占用迅速增加到了 433 MB:而在函数调用结束后,内存则返回正常。这是因为,函数内部声明的列表 a 是局部变量,在函数返回后,局部变量的引用会注销掉;此时,列表 a 所指代对象的引用数为 0,Python 便会执行垃圾回收,因此之前占用的大量内存就又回来了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 def func (): show_memory_info('initial' ) global a a = [i for i in range (10000000 )] show_memory_info('after a created' ) func() show_memory_info('finished' ) initial memory used: 48.88671875 MB after a created memory used: 433.94921875 MB finished memory used: 433.94921875 MB
新的这段代码中,global a 表示将 a 声明为全局变量。那么,即使函数返回后,列表的引用依然存在,于是对象就不会被垃圾回收掉,依然占用大量内存。同样,如果我们把生成的列表返回,然后在主程序中接收,那么引用依然存在,垃圾回收就不会被触发,大量内存仍然被占用着:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 def func (): show_memory_info('initial' ) a = [i for i in derange(10000000 )] show_memory_info('after a created' ) return aa = func() show_memory_info('finished' ) initial memory used: 47.96484375 MB after a created memory used: 434.515625 MB finished memory used: 434.515625 MB
循环回收 如果有两个对象,它们互相引用,并且不再被别的对象所引用,那么它们应该被垃圾回收吗?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 def func (): show_memory_info('initial' ) a = [i for i in range (10000000 )] b = [i for i in range (10000000 )] show_memory_info('after a, b created' ) a.append(b) b.append(a) func() show_memory_info('finished' ) initial memory used: 47.984375 MB after a, b created memory used: 822.73828125 MB finished memory used: 821.73046875 MB
从结果显而易见,它们并没有被回收,但是从程序上来看,当这个函数结束的时候,作为局部变量的a,b就已经从程序意义上不存在了。但是因为它们的互相引用,导致了它们的引用数都不为0。
这时要如何规避呢1. 从代码逻辑上进行整改,避免这种循环引用2. 通过人工回收
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import gcdef func (): show_memory_info('initial' ) a = [i for i in range (10000000 )] b = [i for i in range (10000000 )] show_memory_info('after a, b created' ) a.append(b) b.append(a) func() gc.collect() show_memory_info('finished' ) initial memory used: 49.51171875 MB after a, b created memory used: 824.1328125 MB finished memory used: 49.98046875 MB
python面对循环引用有自动垃圾回收算法:
(1)自动垃圾回收算法——标记清除(mark-sweep)
垃圾回收机制会把所有变量都打上标记,将没有标记的对象进行回收,mark-sweep使用双向链表维护一个数据结构,通常仅包含容器类对象:list、dict、tuplle、instance。
(2)自动垃圾回收算法——分代回收(generational)
分代回收是一种以空间换时间的操作,python根据对象的存活时间划分为不同的集合,其中每一个集合称为一个代,将内存分为3个代,分别为:
年轻代(第0代)
中年代(第1代)
老年代(第2代)
对应3个链表,对象的存活时间越大,垃圾收集频率越低。
新创建的对象均被分配在年轻代,当年轻代链表总数达到上限时,python垃圾回收机制会被触发,将待回收对象回收,将暂不回收被移到中年代,老年代的对象是存活时间最久的对象。
random 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 import randomrandom.random() random.randint(2 ,5 ) random.uniform(2 ,5 ) random.randrange(10 ,100 ,2 ) random.choice(['a' ,'b' ,'c' ,1 ]) p = ['python' ,'is' ,'powerful' ] random.shuffle(p) print (p)random.sample([1 ,2 ,3 ,4 ,5 ],3 ) random.sample([1 ,2 ,3 ,4 ,5 ],3 ) random.seed(5 )
🔧工具 —— pycharm 解决冲突 1 2 3 4 5 Right Click > Git > Resolve Conflicts ... > Double Click File > 选择代码行
当前运行目录 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 import ospath0 = os.path.abspath(__file__) print (f'path0: {path0} ' ) path1 = os.path.dirname(__file__) print (f'path1: {path1} ' ) path2 = os.path.dirname(os.path.dirname(__file__)) print (f'path2: {path2} ' ) path3 = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) print (f'path3: {path3} ' ) path4 = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))) print (f'path4: {path4} ' ) path5 = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(__file__))))) print (f'path5: {path5} ' ) path6 = os.__file__ print (f'path6: {path6} ' )from pathlib import Pathprint (Path(__file__))print (Path(__file__).resolve()) print (Path(__file__).resolve().parent)print (Path(__file__).resolve().parent.parent)
合并字典 1 2 3 4 5 6 7 x = {'a': 1, 'b': 2} y = {'b': 3, 'c': 4} z = {**x, **y} # {'a': 1, 'b': 3, 'c': 4} 或者 z = x.copy() z.update(y)
环境变量 1 2 3 4 5 6 7 import osprint (os.environ.get("ENDPOINT" ))from dotenv import load_dotenvload_dotenv()
输出当前类及方法 1 2 location = f'{self.__class__.__name__} /{sys._getframe().f_code.co_name} '
线程 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 import threadingimport timedef loop (): for i in range (5 ): time.sleep(1 ) print (i) class MyThread (threading.Thread ): def run (self ): loop() if __name__ == '__main__' : t1 = MyThread().start() t2 = MyThread().start() print ("执行完毕" )
Testcase 1 2 3 4 5 6 7 8 9 10 11 net_area: 指定网络区域 pre_test():初始化 run_test():测试用例 post_run():释放资源 使用debug_run(): 调用运行 公共部分代码放到lib包 # TestCase的SetUp和setUpTestData 区别 you should use setUpTestData for the shared data and setUp for the per-test-method client setUp是每一个测试方法前都要运行,而setUpTestData是在数据初始化时运行
定时任务_schedule python任务定时运行库 schedule 模块 schedule
(1)例子
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import scheduleimport timedef job (): print ("I'm working..." ) def job1 (name ): print (name) schedule.every(10 ).minutes.do(job) schedule.every().hour.do(job) schedule.every().day.at("10:30" ).do(job) schedule.every().monday.do(job) schedule.every().wednesday.at("13:15" ).do(job) schedule.every().wednesday.at("13:15" ).do(job1,'waiwen' ) while True : schedule.run_pending() time.sleep(1 )
(2)并行执行:为每一任务创建一个线程,使得任务并行工作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 import threadingimport timeimport scheduledef job (): print ("I'm running on thread %s" % threading.current_thread()) def run_threaded (job_func ): job_thread = threading.Thread(target=job_func) job_thread.start() schedule.every(10 ).seconds.do(run_threaded, job) schedule.every(10 ).seconds.do(run_threaded, job) while True : schedule.run_pending()
(3)使用队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 import Queueimport timeimport threadingimport scheduledef job (): print ("I'm working" ) def worker_main (): while 1 : job_func = jobqueue.get() job_func() jobqueue.task_done() jobqueue = Queue.Queue() schedule.every(10 ).seconds.do(jobqueue.put, job) schedule.every(10 ).seconds.do(jobqueue.put, job) schedule.every(10 ).seconds.do(jobqueue.put, job) schedule.every(10 ).seconds.do(jobqueue.put, job) schedule.every(10 ).seconds.do(jobqueue.put, job) worker_thread = threading.Thread(target=worker_main) worker_thread.start() while True : schedule.run_pending()
(4)仅执行一次
1 2 3 4 5 def job_that_executes_once (): return schedule.CancelJob schedule.every().day.at('22:30' ).do(job_that_executes_once)
(5)取消任务
1 2 3 4 5 6 7 8 9 10 11 12 def greet (name ): print ('Hello {}' .format (name)) schedule.every().day.do(greet, 'Andrea' ).tag('daily-tasks' , 'friend' ) schedule.every().hour.do(greet, 'John' ).tag('hourly-tasks' , 'friend' ) schedule.every().hour.do(greet, 'Monica' ).tag('hourly-tasks' , 'customer' ) schedule.every().day.do(greet, 'Derek' ).tag('daily-tasks' , 'guest' ) schedule.clear('daily-tasks' ) while True : schedule.run_pending()
补充 - Ubuntu - crontab
1 2 * * * * * source ~/.virtualenvrc && workon cd_sdk && python cipipeline.py #一分钟执行一次 */2 * * * * #两分钟执行一次
方法
描述:classmethod修饰符对应的函数不需要实例化,不需要 self 参数,但第一个参数需要是表示自身类的 cls 参数,可以来调用类的属性,类的方法,实例化对象等。
@classmethod要与cls配合使用
1 2 3 4 5 6 7 8 9 10 11 class A (object ): bar = 1 def func1 (self ): print ('foo' ) @classmethod def func2 (cls ): print ('func2' ) print (cls.bar) cls().func1() A.func2()
描述:当不需要引用类或者实例时,建议将方法定义为静态方法。
描述:实例方法只能被实例对象调用
实例方法要与self配合使用
装饰器 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 import timedef timer (func ): '''统计函数运行时间的装饰器''' def wrapper (): start = time.time() func() end = time.time() used = end - start print (f'{func.__name__} used {used} ' ) return wrapper def step1 (): print ('step1.......' ) def step2 (): print ('step2......' ) def step3 (): print ('step3......' ) timed_step1 = timer(step1) timed_step2 = timer(step2) timed_step3 = timer(step3) timed_step1() timed_step2() timed_step3()
语法糖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @timer def step1 (): print ('step1.......' ) @timer def step2 (): print ('step2......' ) @timer def step3 (): print ('step3......' ) step1() step2() step3()
带参数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @timer def step1 (num ): print (f'我走了#{num} 步' ) step1(5 ) import timedef timer (func ): '''统计函数运行时间的装饰器''' def wrapper (*args, **kwargs ): start = time.time() func(*args, **kwargs) end = time.time() used = end - start print (f'{func.__name__} used {used} ' ) return wrapper
被装饰的函数有返回值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 def timer (func ): '''统计函数运行时间的装饰器''' def wrapper (*args, **kwargs ): start = time.time() ret_value = func(*args, **kwargs) end = time.time() used = end - start print (f'{func.__name__} used {used} ' ) return ret_value return wrapper @timer def add (num1, num2 ): return num1 + num2 sum = add(5 , 8 )print (sum )
copy 浅拷贝:仅复制被拷贝对象原先的数据,并指向相同的存储位置(共用一部分数据存储位置)
![image-20210107191323129](/Users/junmingguo/Library/Application Support/typora-user-images/image-20210107191323129.png)
被拷贝的数据中存在可变数据,浅拷贝的对象与被拷贝对象保持一致
若修改不可变数据(例如int类型),则浅拷贝的对象不更新
![image-20210107191613395](/Users/junmingguo/Library/Application Support/typora-user-images/image-20210107191613395.png)
深拷贝:复制一个完全独立的对象
1 2 3 4 import copy l1 = [1, 2, 3, [22, 33]] l2 = copy.deepcopy(l1) l1.append(666)
![image-20210107191722877](/Users/junmingguo/Library/Application Support/typora-user-images/image-20210107191722877.png)
deepcopy 使用示例 1 2 3 4 5 6 7 a = {1 :1 , 2 :2 } b = a b[3 ] = 3 print (a) print (b)
JSON loads & dumps dumps : 将Python对象转为JSON
python对象到json字符串的转换规则:
Python
JSON
dict
object
list, tuple
array
str, unicode
string
int, long, float
number
True
true
False
false
None
null
pythonlist2json
1 json.dumps(mylist,ensure_ascii=False )
eg.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 python2json = {} listData = [1 ,2 ,3 ] python2json["listData" ] = listData python2json["strData" ] = "test python obj 2 json" python2json['bool' ] = False import jsonjson_str = json.dumps(python2json) print (json_str)
Loads : 将JSON转为Python对象
1 2 3 4 5 6 7 8 import jsonstr = '{"listData": [1, 2, 3], "bool": false, "strData": "test python obj 2 json"}' json2python = json.loads(str ) print (type (json2python))print (json2python)
绝对导入 from __future__ import absolute_import
Python 2.4或之前默认是相对引用,即先在本目录下寻找模块。
absolute_import作用:若本目录中有模块名与系统(sys.path)模块同名冲突,而想要引用的是系统模块时,该声明就起作用了,仅用于python2系列。
调用import string时引入的就是系统的标准string.py
调用from pkg import string来引入当前目录的string.py(pkg为你当前文件夹名称)
注意:并不建议使用与内置库名称作为文件名
_
以单个下划线开头的变量或方法仅供内部使用,其中针对变量只是约定而已,而对于方法而言,在使用from abc import *时将无法导入单下划线开头的方法,常规导入import abc,可以正常导入。
以单个下划线结尾的变量或者方法,像class或def这样的名称不能用作Python中的变量名称
以双个下划线开头的变量,例如class的变量,使用dir时将无法直接获取对应的变量名,而是_类名__变量,称为名称修饰,防止变量在子类中被重写,双下划线名称修饰对程序员是完全透明的。
双前导和双末尾下划线的名称,用于特殊用途。这样的例子有,__init__
对象构造函数,或__call__
— 它使得一个对象可以被调用。
单例模式 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 def __new__ (cls, *args, **kwargs ): if cls._instance is None : cls._instance = object .__new__(cls, *args, **kwargs) return cls._instance from functools import wrapsdef handle_client_exception (OperationException ): """ 处理客户端异常 """ def decorate (func ): @wraps(func ) def wrapper (*args, **kwargs ): try : response = func(*args, **kwargs) if hasattr (response, 'code' ) and response.code: raise OperationException( error_code=response.code, func_name=func.__name__, message=response.message ) except Exception as e: raise return response return wrapper return decorate
init & new 1 2 3 4 5 6 7 8 9 10 11 12 13 class A (object ): def __new__ (cls ): print ("A.__new__ called" ) return super (A, cls).__new__(cls) def __init__ (self ): print ("A.__init__ called" ) A()
property 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 class Student (object ): def __init__ (self ): self._age = None def age_getter (self ): return self._age def age_setter (self, age ): self._age = age age = property (age_getter, age_setter) s=Student() print (s.age_setter(11 ))print (s.age_getter())s.age = 22 print (s.age)print (s._age)
装饰器来实现getter、setter、和deleter 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 class Student (object ): def __init__ (self ): self._age = None @property def age (self ): return self._age @age.setter def age (self, age ): self._age = age @age.deleter def age (self ): del self._age student = Student() student.age = 20 print (student.age)del student.age
消除警告 1 2 import warnings warnings.filterwarnings("ignore" )
os os.remove()报错:限权不够
1 2 3 import shutilshutil.rmtree(file_name)
获取当前工作目录,即当前Python脚本工作的目录路径
获取指定工作目录的文件名
删除指定文件
删除多个目录
1 os.removedirs(r"c:\python" )
小数点相加误差 1 2 3 4 from decimal import getcontext,Decimal getcontext().prec = 2 print (Decimal(0.1)+Decimal(0.2))
CSV编码 保存csv文件时中文乱码
1 data.to_csv('basic.csv' ,encoding="utf_8_sig" )
读取csv文件的中文编码格式
1 2 3 4 5 6 7 import pandas as pd fund_return = pd.read_csv('train_fund_return.csv' ) index_return = pd.read_csv('train_index_return.csv' ,encoding='gb18030' ) print (index_return)
储存为csv文件时不添加index
1 2 3 4 5 6 7 import tushare as ts stock_data = ts.get_k_data('002253' ) del stock_data['code' ] stock_data.to_csv('002253.csv' ,index=None)
单行实现嵌套for循环 1 2 3 4 5 6 7 list1 = range (1 ,3 ) list2 = range (4 ,6 ) list3 = range (7 ,9 ) for item1 in list1: for item2 in list2: for item3 in list3: print (item1+item2+item3)
1 2 3 4 5 6 from itertools import productlist1 = range (1 ,3 ) list2 = range (4 ,6 ) list3 = range (7 ,9 ) for item1,item2,item3 in product(list1, list2, list3): print (item1+item2+item3)
计算函数运行时间 1 2 3 4 5 6 7 8 import timestart = time.time() end = time.time() print (end-start)
更简单的计算方法
1 2 3 4 5 6 7 8 9 import timeimport timeitdef run_sleep (second ): print (second) time.sleep(second) print (timeit.timeit(lambda :run_sleep(2 ), number=5 ))
列表解包 1 2 3 array = [['a' , 'b' ], ['c' , 'd' ], ['e' , 'f' ]] transposed = zip (*array) print (list (transposed))
合并字典 1 2 3 4 5 dicta,dictb dictc = dicta.copy() dictc.update(dictb) 或者 {**dicta, **dictb}
运行过程输出源代码 1 2 3 4 5 6 7 8 import inspectdef add (x, y ): return x + y print ("=====source code=====" )print (inspect.getsource(add))
时间戳 时间格式→时间戳
1 2 3 4 5 6 7 import time dt = "2016-05-05 20:28:54" timeArray = time.strptime(dt, "%Y-%m-%d %H:%M:%S" ) timestamp = time.mktime(timeArray) print (timestamp)
时间戳→时间格式
1 2 3 4 5 6 timestamp = 1462451334 time_local = time.localtime(timestamp) dt = time.strftime("%Y-%m-%d %H:%M:%S" ,time_local) print (dt)
字符串转换
b’input\n’ # bytes字节符,打印以b开头
r’input\n’ # 非转义原生字符,经处理’\n’变成了’\‘和’n’。也就是\n表示的是两个字符,而不是换行
u’input\n’ # unicode编码字符,python3默认字符串编码方式。
1 2 3 4 5 6 7 a = b'sql\xe6\x89\xa7\xe8\xa1\x8c\xe5\xa4\xb1\xe8\xb4\xa5' c = a.decode("utf-8" ) a = u'\u5f55\u5165\u4ee3\u7801\u5e93' print (a)
pip下载源优先顺序调整 1 2 find ~ -name 'pip.con' vi /Users/junmingguo/.pip/pip.conf
1 2 3 4 5 6 7 8 9 10 [global] index-url = http://mirrors.example.com/repository/simple/ extra-index-url = http://pypi.example.com/simple/ https://mirrors.example.com/pypi/simple/ trusted-host = mirrors.abc.example.com mirrors.example.com pypi.example.com no-cache-dir = true disable-pip-version-check = true
其中index-url
为第一下载源,extra-index-url
为备用下载源
反射 定义:通过字符串映射object对象的方法或者属性
方法:
1 2 3 4 hasattr(obj,name_str): 判断objec是否有name_str这个方法或者属性 getattr(obj,name_str): 获取object 对象中与name_str同名的方法或者函数 setattr(obj,name_str,value ): 为object 对象设置一个以name_str为名的value 方法或者属性 delattr(obj,name_str): 删除object 对象中的name_str方法或者属性
*arg **kwargs 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 def demo (a, *args, **kwargs ): print (a) print (args) print (kwargs) demo(1 ) demo(1 , 2 , 3 ) demo(a=1 , name='jm' )
python2 & 3 下的range()
避免转义字符 1 2 3 r'abc\ndef' 'abc\ndef'
根据字典键值进行排序 1 2 a={1 :2 , 2 :1 , 3 :3 , 4 :0 } b=dict (sorted (a.items(), key=lambda item: item[1 ]))
str.isalpha python2 下对’中文’进行isalpha则返回False
python3 下对’中文’ 进行isalpha则返回True,要实现与python2一致的逻辑则需要进行转义
1 2 3 4 5 6 a = "runoob" print a.isalpha()b = "runoob菜鸟教程" print b.isalpha()
1 2 3 a = "runoob" print (a.encode('UTF-8' ).isalpha())
(.*?) & (.*)区别 1 2 3 4 5 6 7 8 9 import res="<a>哈哈</a><a>嘿嘿</a>" print (re.findall("<a>(.*)</a>" ,s))print (re.findall("<a>(.*?)</a>" ,s))
int()强制转换 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 int (1.2 )int (-1.2 )int (+1.2 )int ("1" )int ("-1" )int ("1.2" )def __init__ (self, x, base=10 ): """ int([x]) -> integer int(x, base=10) -> integer Convert a number or string to an integer, or return 0 if no arguments are given. If x is a number, return x.__int__(). For floating point numbers, this truncates towards zero. If x is not a number or if base is given, then x must be a string, bytes, or bytearray instance representing an integer literal in the given base. The literal can be preceded by '+' or '-' and be surrounded by whitespace. The base defaults to 10. Valid bases are 0 and 2-36. Base 0 means to interpret the base from the string as an integer literal. >>> int('0b100', base=0) 4 # (copied from class doc) """ pass int (x, base=10 )1. 若x非数字则必须为字符串或者字节、字节数组实例2. base表示进制,默认为10 ,有效范围是[0 ,2 -36 ],0 表示将字符串转换为整型,无论base值为多少,转换后的数值为十进制整数。
不支持double字段
[index:]数组索引不存在
complex complex是复数类型,由实部和虚部组成,即real
+imag
1 2 3 4 5 6 complex (1 , 2 )complex (1 )complex ("1" )
groupby & itemgetter 分组函数 groupby
itemgetter
获取字典key
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 from itertools import groupbyfrom operator import itemgetterfrom operator import itemgetterfrom itertools import groupbyd1={'name' :'zhangsan' ,'age' :20 ,'country' :'China' } d2={'name' :'wangwu' ,'age' :19 ,'country' :'USA' } d3={'name' :'lisi' ,'age' :22 ,'country' :'JP' } d4={'name' :'zhaoliu' ,'age' :22 ,'country' :'USA' } d5={'name' :'pengqi' ,'age' :22 ,'country' :'USA' } d6={'name' :'lijiu' ,'age' :22 ,'country' :'China' } lst=[d1,d2,d3,d4,d5,d6] lst.sort(key=itemgetter('age' )) print (lst)lstg = groupby(lst,itemgetter('country' )) for key,group in lstg: print (key,list (g))
urljoin 通过举例查看规律,第一个参数为base,将至少保留至.com之前(若url不为空则base部分以/结尾),根据第二个参数url来填补,若url部分不包含/则直接替换或者补充至base后续,若url部分包含/则直接填补至base后续。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 from urllib.parse import urljoinprint (urljoin("http://www.chachabei.com/folder/currentpage.html" , "anotherpage.html" ))print (urljoin("http://www.chachabei.com/folder/currentpage.html" , "/anotherpage.html" ))print (urljoin("http://www.chachabei.com/folder/currentpage.html" , "folder2/anotherpage.html" ))print (urljoin("http://www.chachabei.com/folder/currentpage.html" , "/folder2/anotherpage.html" ))print (urljoin("http://www.chachabei.com/abc/folder/currentpage.html" , "/folder2/anotherpage.html" ))print (urljoin("http://www.chachabei.com/abc/folder/currentpage.html" , "../anotherpage.html" ))print (urljoin("" , "../anotherpage.html" ))
importlib 通过字符串名导入模块
1 2 3 4 5 6 import importlibmath = importlib.import_module('math' ) math.sin(2 ) mod = importlib.import_module('urllib.request' ) u = mod.urlopen('http://www.baidu.com' )
import_module只是简单地执行和import相同的步骤,但是返回生成的模块对象。你只需要将其存储在一个变量,然后像正常的模块一样使用。
相对导入:
1 2 3 import importlibb = importlib.import_module('.b' , __package__)
@property 1 2 3 4 5 6 7 8 9 class Student (object ): def get_score (self ): return self._score def set_score (self, value ): if not isinstance (value, int ): raise ValueError('score must be an integer!' ) if value < 0 or value > 100 : raise ValueError('score must between 0 ~ 100!' ) self._score = value
1 2 3 s = Student() s.set_score(60 ) s.get_score()
使用@property
,可作为属性而不需要加( )以方法形式返回,类似于对象的字段
1 2 3 4 5 6 7 8 9 10 11 class Student (object ): @property def score (self ): return self._score @score.setter def score (self, value ): if not isinstance (value, int ): raise ValueError('score must be an integer!' ) if value < 0 or value > 100 : raise ValueError('score must between 0 ~ 100!' ) self._score = value
进程、线程、协程 进程(Process)**是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。
线程,被称为轻量级进程(Lightweight Process,LWP),是程序执行流的最小单元。
协程:一个程序可以包含多个协程,可以对比于一个进程包含多个线程,因而下面我们来比较协程和线程:我们知道多个线程相对独立,有自己的上下文,切换受系统控制;而协程也相对独立,有自己的上下文,但是其切换由自己控制,由当前协程切换到其他协程由当前协程来控制。
守护线程与用户线程
用户线程:我们平常创建的普通线程。
守护线程:用来服务于用户线程;不需要上层逻辑介入。
在JAVA中,当线程只剩下守护线程的时候,JVM就会退出;补充一点如果还有其他的任意一个用户线程还在,JVM就不会退出。
eg.多线程
听一首音乐假如耗时1秒,看一部电影假如耗时5秒,用两个函数定义这两个任务如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 import timefrom functools import wrapsdef fn_timer (function ): ''' 函数计时装饰器 :param function: 函数对象 :return: 装饰器 ''' @wraps(function ) def function_timer (*args,**kwargs ): t0 = time.time() result = function(*args,**kwargs) t1 = time.time() print ('[finished function:{func_name} in {time:.2f}s]' .format (func_name = function.__name__,time = t1 - t0)) return result return function_timer def music (name ): print 'I am listening to music {0}' .format (name) time.sleep(1 ) def movie (name ): print 'I am watching movie {0}' .format (name) time.sleep(5 )
方案一:先一个个听完10首音乐,再一个个看完2部电影,顺序完成,代码如下:
1 2 3 4 5 6 7 @fn_timer def single_thread (): for i in range (10 ): music(i) for i in range (2 ): movie(i)
让我们执行一下这段代码,输出如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 I am listening to music 0 I am listening to music 1 I am listening to music 2 I am listening to music 3 I am listening to music 4 I am listening to music 5 I am listening to music 6 I am listening to music 7 I am listening to music 8 I am listening to music 9 I am watching movie 0 I am watching movie 1 [finished function:single_thread in 20.14s]
方案二:同时听多首音乐,同时看多部电影进行,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 import threading@fn_timer def multi_thread (): threads = [] for i in range (10 ): threads.append(threading.Thread(target = music,args = (i,))) for i in range (2 ): threads.append(threading.Thread(target = movie,args = (i,))) for t in threads: t.setDaemon(True ) t.start() for t in threads: t.join() multi_thread()
执行上述代码,运行结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 I am listening to music 0 I am listening to music 1 I am listening to music 2 I am listening to music 3 I am listening to music 4 I am listening to music 5 I am listening to music 6 I am listening to music 7 I am listening to music 8 I am listening to music 9 I am watching movie 0 I am watching movie 1 [finished function:multi_thread in 5.02s]
这次只用了5秒就完成了,完成效率显著提升。这次试用多线程执行多个任务,所有任务最终的总耗时 = 耗时最长的那个单个任务的耗时,即看一部电影的5秒钟时间。
方案三:使用线程池。上面使用多线程的方式比较繁琐,下面使用线程池来实现:
1 2 3 4 5 6 7 8 9 10 11 12 from multiprocessing import Pool@fn_timer def use_pool (): pool = Pool(20 ) pool.map (movie,range (2 )) pool.map (music,range (10 )) pool.close() pool.join() use_pool()
执行结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 I am listening to music 0 I am listening to music 1 I am listening to music 2 I am listening to music 3 I am listening to music 4 I am listening to music 5 I am listening to music 6 I am listening to music 7 I am listening to music 8 I am listening to music 9 I am watching movie 0 I am watching movie 1 [finished function:use_pool in 6.12s]
可以看出使用线程池反而比手工调度线程多耗时一秒钟,可能是因为线程池内部对线程的调度和线程切换的耗时造成的。
eg. 多进程和进程池的使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 import osimport timefrom multiprocessing import Process,Poolfrom functools import wrapsdef fn_timer (function ): ''' 函数计时装饰器 :param function: 函数对象 :return: 装饰器 ''' @wraps(function ) def function_timer (*args,**kwargs ): t0 = time.time() result = function(*args,**kwargs) t1 = time.time() print ('[finished function:{func_name} in {time:.2f}s]' .format (func_name = function.__name__,time = t1 - t0)) return result return function_timer @fn_timer def do_simple_task (task_name ): print ('Run child process {0}, task name is: {1}' .format (os.getpid(),task_name)) time.sleep(1.2 ) return task_name @fn_timer def test_simple_multi_process (): p1 = Process(target=do_simple_task, args=('task1' ,)) p2 = Process(target=do_simple_task, args=('task2' ,)) print ('Process will start...' ) p1.start() p2.start() p1.join() p2.join() print ('Process end.' ) @fn_timer def test_use_process_pool (): pool = Pool(5 ) task_names = [] for i in range (7 ): task_names.append('task{0}' .format (i)) results = pool.map_async(do_simple_task,task_names) print ('Many processes will start...' ) pool.close() pool.join() print ('All processes end, results is: {0}' .format (results.get())) def main (): test_simple_multi_process() ''' Process will start... Run child process 45824, task name is: task1 Run child process 45825, task name is: task2 [finished function:do_simple_task in 1.20s] [finished function:do_simple_task in 1.20s] Process end. [finished function:test_simple_multi_process in 1.21s] ''' test_use_process_pool() ''' Many processes will start... Run child process 45826, task name is: task0 Run child process 45827, task name is: task1 Run child process 45828, task name is: task2 Run child process 45829, task name is: task3 Run child process 45830, task name is: task4 [finished function:do_simple_task in 1.20s] [finished function:do_simple_task in 1.20s] Run child process 45826, task name is: task5 [finished function:do_simple_task in 1.20s] [finished function:do_simple_task in 1.20s] Run child process 45827, task name is: task6 [finished function:do_simple_task in 1.20s] [finished function:do_simple_task in 1.20s] [finished function:do_simple_task in 1.20s] All processes end, results is: ['task0', 'task1', 'task2', 'task3', 'task4', 'task5', 'task6'] [finished function:test_use_process_pool in 2.52s] ''' if __name__ == '__main__' : main()
进程之间的通信 进程间的通信采用队列来实现,实例代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 import timefrom multiprocessing import Process,Queueimport randomfrom functools import wrapsdef fn_timer (function ): ''' 函数计时装饰器 :param function: 函数对象 :return: 装饰器 ''' @wraps(function ) def function_timer (*args,**kwargs ): t0 = time.time() result = function(*args,**kwargs) t1 = time.time() print ('[finished function:{func_name} in {time:.2f}s]' .format (func_name = function.__name__,time = t1 - t0)) return result return function_timer def write (q ): for value in ['A' ,'B' ,'C' ]: print ('Put value: {0} to queue.' .format (value)) q.put(value) time.sleep(random.random()) def read (q ): while True : value = q.get(True ) print ('Get value: {0} from queue.' .format (value)) def test_communication_between_process (): q = Queue() pw = Process(target = write,args = (q,)) pr = Process(target = read,args = (q,)) pw.start() pr.start() pw.join() pr.terminate() def main (): test_communication_between_process() ''' Put value: A to queue. Get value: A from queue. Put value: B to queue. Get value: B from queue. Put value: C to queue. Get value: C from queue. ''' if __name__ == '__main__' : main()
协程
Python通过yield
提供了对协程的基本支持,但是不完全。而第三方的gevent为Python提供了比较完善的协程支持。gevent是第三方库,通过greenlet实现协程,其基本思想是:
当一个greenlet遇到IO操作时,比如访问网络,就自动切换到其他的greenlet,等到IO操作完成,再在适当的时候切换回来继续执行。由于IO操作非常耗时,经常使程序处于等待状态,有了gevent为我们自动切换协程,就保证总有greenlet在运行,而不是等待IO。
用协程下载同样的20个网页,实例代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 import timeimport geventfrom gevent.pool import Poolfrom gevent import monkeyfrom functools import wrapsdef fn_timer (function ): ''' 函数计时装饰器 :param function: 函数对象 :return: 装饰器 ''' @wraps(function ) def function_timer (*args,**kwargs ): t0 = time.time() result = function(*args,**kwargs) t1 = time.time() print ('[finished function:{func_name} in {time:.2f}s]' .format (func_name = function.__name__,time = t1 - t0)) return result return function_timer monkey.patch_all() @fn_timer def download_using_single_thread (urls ): ''' 顺序执行下载多个网页 :param urls: 要下载的网页内容 :return: 响应列表 ''' resps = [] for index, url in enumerate (urls): time.sleep(1 ) print (f'下载第{index+1 } 个页面:{url} ' ) return resps @fn_timer def download_using_coroutine (urls ): ''' 使用协程下载 :param urls: 要下载的网页内容 :return: 响应列表 ''' spawns = [] for url in urls: spawns.append(gevent.spawn(download_using_single_thread,[url])) rets = gevent.joinall(spawns) results = [ret.get() for ret in rets] @fn_timer def download_using_coroutine_pool (urls ): pool = Pool(4 ) urls = [[url] for url in urls] rets = pool.map (download_using_single_thread,urls) def main (): urls = [f'{i} .html' for i in range (1 ,5 )] ''' 下载第1个页面:1.html 下载第2个页面:2.html 下载第3个页面:3.html 下载第4个页面:4.html [finished function:download_using_single_thread in 4.02s] ''' ''' 下载第1个页面:1.html [finished function:download_using_single_thread in 1.00s] 下载第1个页面:2.html [finished function:download_using_single_thread in 1.00s] 下载第1个页面:3.html [finished function:download_using_single_thread in 1.00s] 下载第1个页面:4.html [finished function:download_using_single_thread in 1.00s] [finished function:download_using_coroutine in 1.00s] ''' download_using_coroutine_pool(urls=urls) ''' 下载第1个页面:1.html [finished function:download_using_single_thread in 1.00s] 下载第1个页面:2.html [finished function:download_using_single_thread in 1.00s] 下载第1个页面:3.html [finished function:download_using_single_thread in 1.00s] 下载第1个页面:4.html [finished function:download_using_single_thread in 1.00s] [finished function:download_using_coroutine_pool in 1.00s] ''' if __name__ == '__main__' : main()
多线程、多进程和协程并发效率的对比
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 from multiprocessing.dummy import Pool as thread_poolimport timefrom multiprocessing import Pool as process_poolfrom gevent.pool import Poolfrom gevent import monkeyfrom functools import wrapsdef fn_timer (function ): ''' 函数计时装饰器 :param function: 函数对象 :return: 装饰器 ''' @wraps(function ) def function_timer (*args,**kwargs ): t0 = time.time() result = function(*args,**kwargs) t1 = time.time() print ('[finished function:{func_name} in {time:.2f}s]' .format (func_name = function.__name__,time = t1 - t0)) return result return function_timer monkey.patch_all() @fn_timer def download_using_single_thread (urls ): ''' 顺序执行下载多个网页 :param urls: 要下载的网页内容 :return: 响应列表 ''' resps = [] for index, url in enumerate (urls): time.sleep(1 ) print (f'下载第{index+1 } 个页面:{url} ' ) return resps @fn_timer def download_using_thread_pool (urls ): pool = thread_pool(4 ) resps = pool.map (download_using_single_thread,[urls]) pool.close() return resps @fn_timer def download_using_process_pool (urls ): pool = process_pool(4 ) results = pool.map_async(download_using_single_thread,[urls]) pool.close() return results.get() @fn_timer def download_using_coroutine_pool (urls ): pool = Pool(4 ) urls = [[url] for url in urls] rets = pool.map (download_using_single_thread,urls) def main (): urls = [f'{i} .html' for i in range (1 , 5 )] ''' 下载第1个页面:1.html 下载第2个页面:2.html 下载第3个页面:3.html 下载第4个页面:4.html [finished function:download_using_single_thread in 4.01s] [finished function:download_using_thread_pool in 4.01s] ''' download_using_process_pool(urls=urls) ''' 代码可能存在问题,需要 KeyboardInterrupt 才运行 下载第1个页面:1.html 下载第2个页面:2.html 下载第3个页面:3.html 下载第4个页面:4.html [finished function:download_using_single_thread in 4.00s] ''' ''' 下载第1个页面:1.html [finished function:download_using_single_thread in 1.00s] 下载第1个页面:2.html [finished function:download_using_single_thread in 1.00s] 下载第1个页面:3.html [finished function:download_using_single_thread in 1.00s] 下载第1个页面:4.html [finished function:download_using_single_thread in 1.00s] [finished function:download_using_coroutine_pool in 1.00s] ''' if __name__ == '__main__' : main()
从结果来看,使用协程池的效率还是略高一点。