Python

概述:Python

[TOC]

Numpy

1
2
import numpy as np
np.__version__

创建随机值及数组

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# 随机数:0-10
a = np.random.randint(10, size=(3, 3))

# 一位数组
print(np.array([1,2,3]))

# 创建二维数组
print(np.array([(1,2,3),(4,5,6)]))

# 创建全零数组
print(np.zeros((3,3)))

# 创建全一数组
print(np.ones((2,3,4)))

# 创建一维等差数组(0 1 2 3 4)
print(np.arange(5))

# 创建二维等差数列
print(np.arange(6).reshape(2,3))

# 创建单位矩阵(二维数组)
print(np.eye(3,3))

# 创建等间隔的一维数组
print(np.linspace(1, 10, num=6))

# 创建二维随机数组
print(np.random.rand(2,3))

# 创建二维随机整数数组(数值小于 5)
print(np.random.randint(5, size=(2,3)))

# 依据自定义函数创建数组
print(np.fromfunction(lambda i, j: i + j, (3, 3)))

# 生成二维示例数组(可以看作矩阵)
A = np.array([[1,2],[3,4]])
B = np.array([[5,6],[7,8]])

# 生成一维示例数组[0:1]
print(np.random.random((3, 2)))

#np.random.rand() 生成一个[0,1)之间的随机浮点数或N维浮点数组

# 生成生成[0,1)之间随机浮点数
print(np.random.rand())

# 生成一个形状为5的一维数组
print(np.random.rand(5))

# 生成2x3的二维数组
print(np.random.rand(2,3))

运算及转换

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# 矩阵乘法运算
print(np.dot(A, B))

# 使用 np.mat 将二维数组准确定义为矩阵,就可以直接使用 * 完成矩阵乘法计算 = np.dot(A,B)
print(np.mat(A) * np.mat(B))

# 矩阵的转置
print(A.T)

# 矩阵求逆
print(np.linalg.inv(A))

# 三角函数
print(np.sin(np.array([10,20,30,40,50])))

# 以自然对数函数为底数的指数函数
print(np.exp(a))

# 数组的方根的运算(开平方)
print(np.sqrt(a))

# 数组的方根的运算(立方)
print(np.power(a, 3))

# 更改数组形状(不改变原始数组)
a.reshape(2, 3)

# 更改数组形状(会改变原始数组)
a.resize(2, 3)

# 展平数组
a.ravel()

# 垂直拼合数组
print(np.vstack((a, b)))

# 水平拼合数组
print(np.hstack((a, b)))

# 沿纵轴分割数组
print(np.hsplit(a, 3))

# 沿横轴分割数组
print(np.vsplit(a, 3))

# 返回每列最大值
print(np.max(a, axis=0))

# 返回每行最小值
print(np.min(a, axis=1))

# 返回每列最大值索引
print(np.argmax(a, axis=0))

# 统计数组各列的中位数
print(np.median(a, axis=0))

# 数组各行的算术平均值
print(np.mean(a, axis=1))

# 数组各列的加权平均值
print(np.average(a, axis=0))

# 统计数组各行的方差
print(np.var(a, axis=1))

# 统计数组各列的标准偏差
print(np.std(a, axis=0))

# np.random.rand() 生成一个浮点数或N维浮点数组,取数范围:正态分布的随机样本数
print(np.random.randn())

print(np.random.randn(2,3))

# np.random.standard_normal(size=None):生成一个浮点数或N维浮点数组,取数范围:标准正态分布随机样本
print(np.random.standard_normal(2))
print(np.random.standard_normal((2,3)))

# np.random.randint(low, high=None, size=None, dtype='l'):
# 生成一个整数或N维整数数组,取数范围:若high不为None时,取[low,high)之间随机整数,否则取值[0,low)之间随机整数。
print(np.random.randint(2))
print(np.random.randint(2,size=5))
print(np.random.randint(2,6,size=5))

# np.random.choice(a, size=None, replace=True, p=None):
# 从序列中获取元素,若a为整数,元素取值为np.range(a)中随机数;若a为数组,取值为a数组元素中随机元素
print(np.random.choice(2,2))
print(np.random.choice(np.array(['a','b','c','f']),(2,3)))

# np.random.shuffle(x):对X进行重排序,如果X为多维数组,整行重排
list1 = [1,2,3,4,5]
np.random.shuffle(list1)
print(list1)

arr = np.arange(9).reshape(3,3)
print(arr)

np.random.shuffle(arr)
print(arr)

# np.random.permutation(x):与numpy.random.shuffle(x)函数功能相同,两者区别:peumutation(x)不会修改X的顺序
print(np.random.permutation(5))#生成一个range(5)随机顺序的数组

list1 = [1,2,3,4]
np.random.permutation(list1)
print(list1)

pandas

读取/写入文件

1
2
3
4
5
6
7
8
9
# 读取CSV文件
pd.DataFrame.from_csv("csv_file")
# 读取CSV文件
pd.read_csv("csv_file")
# 读取Excel
pd.read_excel("excel_file")
# 将DataFrame写入CSV
# 其中以","为分隔符,不带索引或者使用index=None均可实现
df.to_csv("data.csv", sep=",", index=False)

DF相关信息

1
2
3
4
5
6
7
8
9
10
11
12
13
# DataFrame基本统计信息
# 返回dataframe的每一列/属性的数量、均值、标准差、最小值、四分位数、最大值
df.describe()

# 查看DataFrame的基本信息量
df.info()

# 索引和属性
df.index # 索引
df.columns # 属性

# 重新设置index
df = df.set_index('trade_date')

处理缺失值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# 删除:axis=0代表列,axis=1代表行,how='all'表示均为NaN删除,how='any'表示存在某一个NaN则删除
df.dropna(axis=0,how='all')

# 使用全局常量填充缺失值
df=df.fillna(value=0)

# 均值填充
df['a'] = df['a'].fillna(df['a'].means())

# 中位数填充
df['a'] = df['a'].fillna(df['a'].median())

# 众数填充
df['a'] = df['a'].fillna(stats.mode(df['a'])[0][0])

# 用前一个数据进行填充
from scipy.stats import stats
df['a'] = df['a'].fillna(method='pad')

# 用后一个数据进行填充
from scipy.stats import stats
df['a'] = df['a'].fillna(method='bfill')

# 检查缺失值NaN
pd.isnull(df.open)

# 使用" "替换空值
df = df.replace(" ",np.NaN)

操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 删除列
df.drop('open', axis=1)
# 或
del data['open']
# 删除行
df.drop(df.index[[02]], inplace=True

# 将某列转为浮点类型
pd.to_numeric(df["open"], errors='coerce')

# 将DataFrame转为Numpy
df.as_matrix()

# 根据属性和索引定位具体值 df.loc[索引,列名]
# 读取索引为'2019-11-07'的列名'open'的值
df.loc['2019-11-07','open']

# 累加操作
data = data.cumsum()

合并数据集

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 生成数据集
df1 = pd.DataFrame(np.ones((3,4))*0, columns=['a','b','c','d'])
df2 = pd.DataFrame(np.ones((3,4))*1, columns=['a','b','c','d'])
df3 = pd.DataFrame(np.ones((3,4))*2, columns=['a','b','c','d'])
# 对多个DataFrame合并操作
res = pd.concat([df1, df2, df3], axis=0)
# 数据输出(axis=0表示纵向合并,axis=1表示横向合并)
# 在数据输出纵向合并时index会有重复,可以使用index_ignore参数修改:
res = pd.concat([df1, df2, df3], axis=0, ignore_index=True)

# 面对不同列名的多个DataFrame合并时,定义两个列名不同的DataFrame
df1 = pd.DataFrame(np.ones((3,4))*0, columns=['a','b','c','d'], index=[1,2,3])
df2 = pd.DataFrame(np.ones((3,4))*1, columns=['b','c','d','e'], index=[2,3,4])
# concat合并时参数join默认为outer,还可选择inner
res = pd.concat([df1, df2], axis=0, join='outer')

# 除了使用concat合并之外,还可以使用append对DataFrame进行纵向合并,注意append仅能纵向合并
df1 = pd.DataFrame(np.ones((3,4))*0, columns=['a','b','c','d'])
df2 = pd.DataFrame(np.ones((3,4))*1, columns=['a','b','c','d'])
res = df1.append(df2, ignore_index=True)

# 实现大于2个DataFrame的append合并:
df1 = pd.DataFrame(np.ones((3,4))*0, columns=['a','b','c','d'])
df2 = pd.DataFrame(np.ones((3,4))*1, columns=['a','b','c','d'])
df3 = pd.DataFrame(np.ones((3,4))*1, columns=['a','b','c','d'])
res = df1.append([df2, df3], ignore_index=True)

数据统计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# 1 汇总统计
print(df.describe())

# 2 非NaN值统计
print(df.count())

# 3 最大值和最小值
print(df.min())
print(df.max())

# 4 最大值和最小值所对应的索引值,由于多维度,不指定具体的属性将会报错
# 另外idxmin/idmax仅返回index的下标
print(df['date'][df['open'].idxmin()])
print(df['open'].idxmax())

# 5 数据集的分位数,默认为0.5 显示2/4分位数
# df.quantile(0.75),显示3/4分位数
print(df.quantile())
print(df.quantile(0.75))

# 6 列值总和
print(df.sum())

# 7 各列均值
print(df.mean())

# 8 各列中位数
print(df.mean())

# 9 各列根据平均值计算平均绝对离差,即平均绝对误差(Mean Absolute Deviation)
print(df.mad())

# 10 各列方差
print(df.var())

# 11 各列标准差
print(df.std())

# 12 各列偏度,即三阶标准化矩
print(df.skew())

# 13 各列峰度
print(df.kurt())

# 14 各列累加和
print(df.cumsum())

# 15 各列累加和最大值和最小值
print(df.cummin())
print(df.cumax())

# 16 各列累积
print(df.cumprod())

# 17 各列一阶方差
print(df.diff())

# 18 各列百分数变化
print(df.pct_change())

# 19 修改列名
df.columns = ["f1","f2","f3","f4"]

# 20 随机生成df(默认4列[A,B,C,D])
pd.util.testing.makeDataFrame().head(10)

# 21 将excel等表格转为df
1)复制表格
2)执行pd.read_clipboard

# 22 查看df的当前内存用量
df.info

# 23 限制读取行数
reader = pd.read_csv("http://...",chunksize=10)

# 24 显示所有列
pd.set_option("display.max_columns", None)

# 25 改变浮点数显示位数(1位)
pd.set_option("display.precision",1)

# 26 修改样式
df.style.bar("age",vmin=0)# 年龄属性最小值为0画横向柱状
df.style.hide_index()# 隐藏索引
df.style.highlight_max("label")# 高亮标签

# 选取前n行数据
df.head(n)

# 数据排序
df = df.sort_values("open",ascending = False)
//对具体某列进行排序,其中ascending=False表示降序,而True为升序,其默认ascending为升序

# 过滤数据
df = df[df["open"]>5555]

可视化

使用matplotlib和numpy配合

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 单一数据可视化
# 生成1000个随机数并数据可视化
import pandas as pd
import matplotlib.pyplot as plt
data = pd.Series(np.random.randn(1000),index=np.arange(1000))
data.plot()
plt.show()

# 多维数据可视化
# 生成1000×4维度的随机数并数据可视化
data = pd.DataFrame(np.random.randn(1000,4),index=np.arange(1000),columns=list("ABCD"))
data.plot()
plt.show()

# 对多维数据进行散点图展示
ax = data.plot.scatter(x='A', y='B', color='DarkBlue', label="Class 1(A/B)")
bx = data.plot.scatter(x='A', y='C', color='LightBlue', label="Class 2(A/C)",ax=ax)
data.plot.scatter(x='A', y='D', color='LightGreen', label='Class 3(A/D)', ax=bx)
plt.show()

在Python中一切皆为对象,所有变量被赋值后均遵循对象引用机制,在运行时需要再内存中开辟一个空间,计算完成后将结果输出至永久性存储器,当数据量过大时易出现OOM(out of memory),即内存爆炸。

当一个对象不再被调用时,当该对象的引用计数指针数为0,需要被回收,可以通过以下命令查看指定变量被引用的次数:

1
2
3
4
import sys
a = 123
print(sys.getrefcount(a))
# result:2 一次来源于声明a变量;一次来源于getrefcount(a)

手动释放内存:

1
2
3
import gc
gc.collect()
show_memory_info('collected')

当两个局部变量循环引用时,即当a与b互相引用时,即使函数完成后其引用数也不为0,对于该情况时可以采取人工回收。

垃圾回收

计数引用

当一个对象不再调用的时候,也就是当这个对象的引用计数(指针数)为 0 的时候,说明这个对象永不可达,自然它也就成为了垃圾,需要被回收。可以简单的理解为没有任何变量再指向它。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import os
import psutil

# 显示当前 python 程序占用的内存大小
def show_memory_info(hint):
pid = os.getpid()
p = psutil.Process(pid)

info = p.memory_full_info()
memory = info.uss / 1024./ 1024
print('{} memory used: {} MB'.format(hint, memory))


def func():
show_memory_info('initial')
a = [i for i in range(10000000)]
show_memory_info('after a created')

func()
show_memory_info('finished')

########## 输出 ##########
initial memory used: 47.19140625 MB
after a created memory used: 433.91015625 MB
finished memory used: 48.109375 MB

可以看到调用函数 func(),在列表 a 被创建之后,内存占用迅速增加到了 433 MB:而在函数调用结束后,内存则返回正常。这是因为,函数内部声明的列表 a 是局部变量,在函数返回后,局部变量的引用会注销掉;此时,列表 a 所指代对象的引用数为 0,Python 便会执行垃圾回收,因此之前占用的大量内存就又回来了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def
func():
show_memory_info('initial')
global a
a = [i for i in range(10000000)]
show_memory_info('after a created')

func()
show_memory_info('finished')

########## 输出 ##########

initial memory used: 48.88671875 MB
after a created memory used: 433.94921875 MB
finished memory used: 433.94921875 MB

新的这段代码中,global a 表示将 a 声明为全局变量。那么,即使函数返回后,列表的引用依然存在,于是对象就不会被垃圾回收掉,依然占用大量内存。同样,如果我们把生成的列表返回,然后在主程序中接收,那么引用依然存在,垃圾回收就不会被触发,大量内存仍然被占用着:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def
func():
show_memory_info('initial')
a = [i for i in derange(10000000)]
show_memory_info('after a created')
return a

a = func()
show_memory_info('finished')

########## 输出 ##########

initial memory used: 47.96484375 MB
after a created memory used: 434.515625 MB
finished memory used: 434.515625 MB

循环回收

如果有两个对象,它们互相引用,并且不再被别的对象所引用,那么它们应该被垃圾回收吗?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def
func():
show_memory_info('initial')
a = [i for i in range(10000000)]
b = [i
for i in range(10000000)]
show_memory_info('after a, b created')
a.append(b)
b.append(a)

func()
show_memory_info('finished')

########## 输出 ##########

initial memory used: 47.984375 MB
after a, b created memory used: 822.73828125 MB
finished memory used: 821.73046875 MB

从结果显而易见,它们并没有被回收,但是从程序上来看,当这个函数结束的时候,作为局部变量的a,b就已经从程序意义上不存在了。但是因为它们的互相引用,导致了它们的引用数都不为0。

这时要如何规避呢1. 从代码逻辑上进行整改,避免这种循环引用2. 通过人工回收

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import gc

def func():
show_memory_info('initial')
a = [i for i in range(10000000)]
b = [i for i in range(10000000)]
show_memory_info('after a, b created')
a.append(b)
b.append(a)

func()
gc.collect()
show_memory_info('finished')

########## 输出 ##########

initial memory used: 49.51171875 MB
after a, b created memory used: 824.1328125 MB
finished memory used: 49.98046875 MB

python面对循环引用有自动垃圾回收算法:

(1)自动垃圾回收算法——标记清除(mark-sweep)

垃圾回收机制会把所有变量都打上标记,将没有标记的对象进行回收,mark-sweep使用双向链表维护一个数据结构,通常仅包含容器类对象:list、dict、tuplle、instance。

(2)自动垃圾回收算法——分代回收(generational)

分代回收是一种以空间换时间的操作,python根据对象的存活时间划分为不同的集合,其中每一个集合称为一个代,将内存分为3个代,分别为:

  • 年轻代(第0代)

  • 中年代(第1代)

  • 老年代(第2代)

对应3个链表,对象的存活时间越大,垃圾收集频率越低。

新创建的对象均被分配在年轻代,当年轻代链表总数达到上限时,python垃圾回收机制会被触发,将待回收对象回收,将暂不回收被移到中年代,老年代的对象是存活时间最久的对象。

random

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import random
# 随机生成0-1的随机数
random.random()#0.444365931841868
# 随机生成2-5之间的整数
random.randint(2,5)#3
# 随机生成2-5之间的有理数
random.uniform(2,5)#2.391457672495461
# 从[10,12,14,...,100]随机生成1个数
random.randrange(10,100,2)#72
# 从['a','b','c']随机生成1个项
random.choice(['a','b','c',1])#'c'
# 打乱列表
p = ['python','is','powerful']
random.shuffle(p)
print(p)# ['powerful', 'is', 'python']
# 打乱顺序且随机选取3个数
random.sample([1,2,3,4,5],3)#[1, 3, 2]
# 随机选取3个数
random.sample([1,2,3,4,5],3)#[1, 3, 2]
# 指定前5次随机生成数相同
random.seed(5)

🔧工具 —— pycharm 解决冲突

1
2
3
4
5
Right Click
> Git
> Resolve Conflicts ...
> Double Click File
> 选择代码行

当前运行目录

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import os

# 该文件所在位置:D:\第1层\第2层\第3层\第4层\第5层\test11.py

path0 = os.path.abspath(__file__)
print(f'path0: {path0}') # 获取当前运行脚本的绝对路径

path1 = os.path.dirname(__file__)
print(f'path1: {path1}') # 获取当前运行脚本的目录绝对路径

path2 = os.path.dirname(os.path.dirname(__file__)) #
print(f'path2: {path2}') # 获取当前运行脚本的目录绝对路径(去掉最后一个路径)

path3 = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
print(f'path3: {path3}') # 获取当前运行脚本的目录绝对路径(去掉最后2个路径)

path4 = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
print(f'path4: {path4}') # 获取当前运行脚本的目录绝对路径(去掉最后3个路径)

path5 = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))))
print(f'path5: {path5}') # 获取当前运行脚本的绝对路径(去掉最后4个路径)

path6 = os.__file__ # 获取os所在的目录
print(f'path6: {path6}')

# ⬇️ RUN RESULT ⬇️
# path0: /Users/junmingguo/Documents/mytest/A.py
# path1: /Users/junmingguo/Documents/mytest
# path2: /Users/junmingguo/Documents
# path3: /Users/junmingguo
# path4: /Users
# path5: /
# path6: /Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/os.py

from pathlib import Path
print(Path(__file__))
# /Users/junmingguo/Documents/mytest/A.py
print(Path(__file__).resolve()) # 转换为绝对路径
# /Users/junmingguo/Documents/mytest/A.py
print(Path(__file__).resolve().parent)
# /Users/junmingguo/Documents/mytest
print(Path(__file__).resolve().parent.parent)
# /Users/junmingguo/Documents

合并字典

1
2
3
4
5
6
7
x = {'a': 1, 'b': 2}
y = {'b': 3, 'c': 4}

z = {**x, **y} # {'a': 1, 'b': 3, 'c': 4}
或者
z = x.copy()
z.update(y)

环境变量

1
2
3
4
5
6
7
# 输出
import os
print(os.environ.get("ENDPOINT"))

# 导入(本地环境配置,实际使用时请自行导入环境变量)
from dotenv import load_dotenv
load_dotenv()

输出当前类及方法

1
2
location = f'{self.__class__.__name__}/{sys._getframe().f_code.co_name}'
# 当前运行的 类 及 方法

线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import threading
import time

def loop():
for i in range(5):
time.sleep(1)
print(i)

class MyThread(threading.Thread):
def run(self):
loop()

if __name__ == '__main__':
t1 = MyThread().start()
t2 = MyThread().start()
print("执行完毕")
# ⬇️ RUN RESULT ⬇️
# 执行完毕
# 00
#
# 1
# 1
# 2
# 2
# 3
# 3
# 4
# 4

Testcase

1
2
3
4
5
6
7
8
9
10
11
net_area: 指定网络区域
pre_test():初始化
run_test():测试用例
post_run():释放资源
使用debug_run(): 调用运行
公共部分代码放到lib包

# TestCase的SetUp和setUpTestData 区别
you should use setUpTestData for the shared data and setUp for the per-test-method client

setUp是每一个测试方法前都要运行,而setUpTestData是在数据初始化时运行

定时任务_schedule

python任务定时运行库 schedule 模块 schedule

(1)例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import schedule
import time

def job():
print("I'm working...")

def job1(name): #带参数
print(name)

schedule.every(10).minutes.do(job)           #每10分钟执行一次
schedule.every().hour.do(job)           #每小时执行一次
schedule.every().day.at("10:30").do(job)      #每天10:30执行一次
schedule.every().monday.do(job)   #每周星期一执行一次
schedule.every().wednesday.at("13:15").do(job) #每周星期三执行一次
schedule.every().wednesday.at("13:15").do(job1,'waiwen') #传入参数

while True:
schedule.run_pending()
time.sleep(1)

(2)并行执行:为每一任务创建一个线程,使得任务并行工作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import threading
import time
import schedule

def job():
print("I'm running on thread %s" % threading.current_thread())

def run_threaded(job_func):
job_thread = threading.Thread(target=job_func)
job_thread.start()

schedule.every(10).seconds.do(run_threaded, job)
schedule.every(10).seconds.do(run_threaded, job)

while True:
schedule.run_pending()

(3)使用队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import Queue
import time
import threading
import schedule


def job():
print("I'm working")

def worker_main():
while 1:
job_func = jobqueue.get()
job_func()
jobqueue.task_done()

jobqueue = Queue.Queue()

schedule.every(10).seconds.do(jobqueue.put, job)
schedule.every(10).seconds.do(jobqueue.put, job)
schedule.every(10).seconds.do(jobqueue.put, job)
schedule.every(10).seconds.do(jobqueue.put, job)
schedule.every(10).seconds.do(jobqueue.put, job)

worker_thread = threading.Thread(target=worker_main)
worker_thread.start()

while True:
schedule.run_pending()

(4)仅执行一次

1
2
3
4
5
def job_that_executes_once():
# Do some work ...
return schedule.CancelJob

schedule.every().day.at('22:30').do(job_that_executes_once)

(5)取消任务

1
2
3
4
5
6
7
8
9
10
11
12
def greet(name):
print('Hello {}'.format(name))

schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend')
schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend')
schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')
schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest')

schedule.clear('daily-tasks')

while True:
schedule.run_pending()

补充 - Ubuntu - crontab

1
2
* * * * * source ~/.virtualenvrc && workon cd_sdk && python cipipeline.py #一分钟执行一次
*/2 * * * * #两分钟执行一次

方法

  • 类方法 @classmethod

描述:classmethod修饰符对应的函数不需要实例化,不需要 self 参数,但第一个参数需要是表示自身类的 cls 参数,可以来调用类的属性,类的方法,实例化对象等。

@classmethod要与cls配合使用

1
2
3
4
5
6
7
8
9
10
11
class A(object):
bar = 1
def func1(self):
print ('foo')
@classmethod
def func2(cls):
print ('func2')
print (cls.bar)
cls().func1() # 调用 foo 方法

A.func2() # 不需要实例化
  • 静态方法 @staticmethod

描述:当不需要引用类或者实例时,建议将方法定义为静态方法。

  • 实例方法

描述:实例方法只能被实例对象调用

实例方法要与self配合使用

装饰器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import time

def timer(func):
'''统计函数运行时间的装饰器'''
def wrapper():
start = time.time()
func() # 被装饰的函数
end = time.time()
used = end - start
print(f'{func.__name__} used {used}')
return wrapper


def step1():
print('step1.......')

def step2():
print('step2......')

def step3():
print('step3......')

timed_step1 = timer(step1)
timed_step2 = timer(step2)
timed_step3 = timer(step3)
timed_step1()
timed_step2()
timed_step3()

语法糖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@timer
def step1():
print('step1.......')

@timer
def step2():
print('step2......')

@timer
def step3():
print('step3......')

step1()
step2()
step3()

带参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@timer
def step1(num):
print(f'我走了#{num}步')

step1(5)
# 会报错,因为python先调用wrapper函数,不接受参数而报错,在wrapper加参数
# --------------------------------------------------------------
import time

def timer(func):
'''统计函数运行时间的装饰器'''
def wrapper(*args, **kwargs):
start = time.time()
func(*args, **kwargs) # 被装饰的函数
end = time.time()
used = end - start
print(f'{func.__name__} used {used}')
return wrapper

被装饰的函数有返回值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def timer(func):
'''统计函数运行时间的装饰器'''
def wrapper(*args, **kwargs):
start = time.time()
ret_value = func(*args, **kwargs) # 获取被装饰的函数返回值
end = time.time()
used = end - start
print(f'{func.__name__} used {used}')
return ret_value # return 返回值
return wrapper

@timer
def add(num1, num2):
return num1 + num2

sum = add(5, 8)
print(sum)

copy

浅拷贝:仅复制被拷贝对象原先的数据,并指向相同的存储位置(共用一部分数据存储位置)

![image-20210107191323129](/Users/junmingguo/Library/Application Support/typora-user-images/image-20210107191323129.png)

被拷贝的数据中存在可变数据,浅拷贝的对象与被拷贝对象保持一致

图片

若修改不可变数据(例如int类型),则浅拷贝的对象不更新

![image-20210107191613395](/Users/junmingguo/Library/Application Support/typora-user-images/image-20210107191613395.png)

深拷贝:复制一个完全独立的对象

1
2
3
4
import copy
l1 = [1, 2, 3, [22, 33]]
l2 = copy.deepcopy(l1)
l1.append(666)

![image-20210107191722877](/Users/junmingguo/Library/Application Support/typora-user-images/image-20210107191722877.png)

deepcopy 使用示例

1
2
3
4
5
6
7
# list/dict: 直接赋值会影响原先 list/dict
a = {1:1, 2:2}
b = a
b[3] = 3
print(a) # {1:1, 2:2, 3:3}
print(b) # {1:1, 2:2, 3:3}
# 需要使用deepcopy

JSON loads & dumps

dumps : 将Python对象转为JSON

python对象到json字符串的转换规则:

Python JSON
dict object
list, tuple array
str, unicode string
int, long, float number
True true
False false
None null

pythonlist2json

1
json.dumps(mylist,ensure_ascii=False)  # 如果涉及中文需要设置ensure_ascii

eg.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#构造字典
python2json = {}
#构造list
listData = [1,2,3]
python2json["listData"] = listData
python2json["strData"] = "test python obj 2 json"
python2json['bool'] = False

import json
#转换成json字符串
json_str = json.dumps(python2json)
print(json_str)

# {"listData": [1, 2, 3], "strData": "test python obj 2 json", "bool": false}

Loads : 将JSON转为Python对象

1
2
3
4
5
6
7
8
import json
str = '{"listData": [1, 2, 3], "bool": false, "strData": "test python obj 2 json"}'
json2python = json.loads(str)
print (type(json2python))
print(json2python)

# <class 'dict'>
# {'listData': [1, 2, 3], 'bool': False, 'strData': 'test python obj 2 json'}

绝对导入

from __future__ import absolute_import

Python 2.4或之前默认是相对引用,即先在本目录下寻找模块。

absolute_import作用:若本目录中有模块名与系统(sys.path)模块同名冲突,而想要引用的是系统模块时,该声明就起作用了,仅用于python2系列。

调用import string时引入的就是系统的标准string.py

调用from pkg import string来引入当前目录的string.py(pkg为你当前文件夹名称)

注意:并不建议使用与内置库名称作为文件名

_

  1. 以单个下划线开头的变量或方法仅供内部使用,其中针对变量只是约定而已,而对于方法而言,在使用from abc import *时将无法导入单下划线开头的方法,常规导入import abc,可以正常导入。

  2. 以单个下划线结尾的变量或者方法,像class或def这样的名称不能用作Python中的变量名称

  3. 以双个下划线开头的变量,例如class的变量,使用dir时将无法直接获取对应的变量名,而是_类名__变量,称为名称修饰,防止变量在子类中被重写,双下划线名称修饰对程序员是完全透明的。

  4. 双前导和双末尾下划线的名称,用于特殊用途。这样的例子有,__init__对象构造函数,或__call__ — 它使得一个对象可以被调用。

单例模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = object.__new__(cls, *args, **kwargs)
return cls._instance

# coding: utf-8
from functools import wraps


def handle_client_exception(OperationException):
"""
处理客户端异常
"""
def decorate(func):
@wraps(func)
def wrapper(*args, **kwargs):
try:
response = func(*args, **kwargs)
if hasattr(response, 'code') and response.code:
raise OperationException(
error_code=response.code,
func_name=func.__name__,
message=response.message
)
except Exception as e: # pylint: disable=broad-except
raise
return response
return wrapper
return decorate

init & new

1
2
3
4
5
6
7
8
9
10
11
12
13
# 知识点: __init__,创建方法后默认调用,不需要返回值(不允许返回非None值)
# __new__必须包含参数cls,表示当前类,且必须要有返回值,可以return 父类
# 先执行__new__后自动调用__init__
class A(object): # -> don't forget the object specified as base

def __new__(cls):
print ("A.__new__ called")
return super(A, cls).__new__(cls)

def __init__(self):
print ("A.__init__ called")

A()

property

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class Student(object):
def __init__(self):
self._age = None

def age_getter(self):
return self._age

def age_setter(self, age):
self._age = age

age = property(age_getter, age_setter) # property是一种特殊的赋值__init__的方法

s=Student()
print(s.age_setter(11))
print(s.age_getter())

s.age = 22
print(s.age)
print(s._age)

装饰器来实现getter、setter、和deleter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class Student(object):
def __init__(self):
self._age = None

@property
def age(self):
return self._age

@age.setter
def age(self, age):
self._age = age

@age.deleter
def age(self):
del self._age


student = Student()
student.age = 20
print (student.age)
del student.age

消除警告

1
2
import warnings
warnings.filterwarnings("ignore")

os

os.remove()报错:限权不够

1
2
3
import shutil

shutil.rmtree(file_name)

获取当前工作目录,即当前Python脚本工作的目录路径

1
os.getcwd()

获取指定工作目录的文件名

1
os.listdir()

删除指定文件

1
os.remove()

删除多个目录

1
os.removedirs(r"c:\python")

小数点相加误差

1
2
3
4
from decimal import getcontext,Decimal
# 精确到小数点后几位
getcontext().prec = 2
print(Decimal(0.1)+Decimal(0.2))

CSV编码

保存csv文件时中文乱码

1
data.to_csv('basic.csv',encoding="utf_8_sig")

读取csv文件的中文编码格式

1
2
3
4
5
6
7
import pandas as pd
# 读取csv文件(Dataframe格式)
fund_return = pd.read_csv('train_fund_return.csv')
# 涉及到中文读取编码错误
index_return = pd.read_csv('train_index_return.csv',encoding='gb18030')
# 或者gb2312
print(index_return)

储存为csv文件时不添加index

1
2
3
4
5
6
7
import tushare as ts
stock_data = ts.get_k_data('002253')
# 删除指定列
del stock_data['code']
# 存入csv时不添加index
stock_data.to_csv('002253.csv',index=None)
# 或者index=False

单行实现嵌套for循环

1
2
3
4
5
6
7
list1 = range(1,3)
list2 = range(4,6)
list3 = range(7,9)
for item1 in list1:
for item2 in list2:
for item3 in list3:
print(item1+item2+item3)
1
2
3
4
5
6
from itertools import product
list1 = range(1,3)
list2 = range(4,6)
list3 = range(7,9)
for item1,item2,item3 in product(list1, list2, list3):
print(item1+item2+item3)

计算函数运行时间

1
2
3
4
5
6
7
8
import time

start = time.time()

# run the function

end = time.time()
print(end-start)

更简单的计算方法

1
2
3
4
5
6
7
8
9
import time
import timeit

def run_sleep(second):
print(second)
time.sleep(second)

# 只用这一行
print(timeit.timeit(lambda :run_sleep(2), number=5))

列表解包

1
2
3
array = [['a', 'b'], ['c', 'd'], ['e', 'f']]
transposed = zip(*array)
print(list(transposed))

合并字典

1
2
3
4
5
dicta,dictb
dictc = dicta.copy()
dictc.update(dictb)
或者
{**dicta, **dictb}

运行过程输出源代码

1
2
3
4
5
6
7
8
import inspect


def add(x, y):
return x + y

print("=====source code=====")
print(inspect.getsource(add))

时间戳

时间格式→时间戳

1
2
3
4
5
6
7
import time
dt = "2016-05-05 20:28:54"
#转换成时间数组
timeArray = time.strptime(dt, "%Y-%m-%d %H:%M:%S")
#转换成时间戳
timestamp = time.mktime(timeArray)
print (timestamp)

时间戳→时间格式

1
2
3
4
5
6
timestamp = 1462451334
# 转换成localtime
time_local = time.localtime(timestamp)
# 转换成新的时间格式(2016-05-05 20:28:54)
dt = time.strftime("%Y-%m-%d %H:%M:%S",time_local)
print (dt)

字符串转换

  • b’input\n’ # bytes字节符,打印以b开头

  • r’input\n’ # 非转义原生字符,经处理’\n’变成了’\‘和’n’。也就是\n表示的是两个字符,而不是换行

  • u’input\n’ # unicode编码字符,python3默认字符串编码方式。

1
2
3
4
5
6
7
# bytyes -> str 
a = b'sql\xe6\x89\xa7\xe8\xa1\x8c\xe5\xa4\xb1\xe8\xb4\xa5'
c = a.decode("utf-8") # 中文

# bytyes -> str
a = u'\u5f55\u5165\u4ee3\u7801\u5e93'
print(a) # 中文

pip下载源优先顺序调整

1
2
find ~ -name 'pip.con'
vi /Users/junmingguo/.pip/pip.conf
1
2
3
4
5
6
7
8
9
10
[global]
index-url = http://mirrors.example.com/repository/simple/
extra-index-url = http://pypi.example.com/simple/
https://mirrors.example.com/pypi/simple/

trusted-host = mirrors.abc.example.com
mirrors.example.com
pypi.example.com
no-cache-dir = true
disable-pip-version-check = true

其中index-url为第一下载源,extra-index-url为备用下载源

反射

定义:通过字符串映射object对象的方法或者属性

方法:

1
2
3
4
hasattr(obj,name_str): 判断objec是否有name_str这个方法或者属性
getattr(obj,name_str): 获取object对象中与name_str同名的方法或者函数
setattr(obj,name_str,value): 为object对象设置一个以name_str为名的value方法或者属性
delattr(obj,name_str): 删除object对象中的name_str方法或者属性

*arg **kwargs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def demo(a, *args, **kwargs):
print(a)
print(args)
print(kwargs)


demo(1)
# 1
# None
# None

demo(1, 2, 3)
# 1
# (2, 3)
# None

demo(a=1, name='jm')
# 1
# None
# {'name': 'jm'}

python2 & 3 下的range()

1
2
3
4
5
6
# python2 返回列表
range(2)
# [0,1]
# python3 返回迭代器(节约内存)
range(2)
# range(0,2)

避免转义字符

1
2
3
r'abc\ndef'  # abc\ndef
'abc\ndef' # abc
# def

根据字典键值进行排序

1
2
a={1:2, 2:1, 3:3, 4:0}
b=dict(sorted(a.items(), key=lambda item: item[1]))

str.isalpha

python2 下对’中文’进行isalpha则返回False

python3 下对’中文’ 进行isalpha则返回True,要实现与python2一致的逻辑则需要进行转义

1
2
3
4
5
6
# python2
a = "runoob"
print a.isalpha()

b = "runoob菜鸟教程"
print b.isalpha()
1
2
3
# python3
a = "runoob"
print(a.encode('UTF-8').isalpha())

(.*?) & (.*)区别

1
2
3
4
5
6
7
8
9
# (.*) 是贪婪匹配:满足正则的尽可能地多匹配
# (.*?) 是非贪婪匹配:满足正则的尽可能地少匹配

import re
s="<a>哈哈</a><a>嘿嘿</a>"
print(re.findall("<a>(.*)</a>",s))
# ['哈哈</a><a>嘿嘿']
print(re.findall("<a>(.*?)</a>",s))
# ['哈哈', '嘿嘿']

int()强制转换

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# 转换成int类型的示例
int(1.2)
int(-1.2)
int(+1.2)

int("1")
int("-1")
int("1.2")

# 方法参数及注释
def __init__(self, x, base=10): # known special case of int.__init__
"""
int([x]) -> integer
int(x, base=10) -> integer

Convert a number or string to an integer, or return 0 if no arguments
are given. If x is a number, return x.__int__(). For floating point
numbers, this truncates towards zero.

If x is not a number or if base is given, then x must be a string,
bytes, or bytearray instance representing an integer literal in the
given base. The literal can be preceded by '+' or '-' and be surrounded
by whitespace. The base defaults to 10. Valid bases are 0 and 2-36.
Base 0 means to interpret the base from the string as an integer literal.
>>> int('0b100', base=0)
4
# (copied from class doc)
"""
pass
# 理解
int(x, base=10)
1. 若x非数字则必须为字符串或者字节、字节数组实例
2. base表示进制,默认为10,有效范围是[0,2-36],0表示将字符串转换为整型,无论base值为多少,转换后的数值为十进制整数。

不支持double字段

1
double(0.8)  # 报错

[index:]数组索引不存在

1
2
a = [1,2,3]
b = a[10:] # 输出 b = [],在获取列表切片时若begin index超过成员变量不会触发indexerror

complex

complex是复数类型,由实部和虚部组成,即real+imag

1
2
3
4
5
6
complex(1, 2)
# (1+2j)
complex(1)
# (1+0j)
complex("1")
# (1)

groupby & itemgetter

分组函数 groupby

itemgetter获取字典key

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
from itertools import groupby
from operator import itemgetter

from operator import itemgetter
from itertools import groupby
d1={'name':'zhangsan','age':20,'country':'China'}
d2={'name':'wangwu','age':19,'country':'USA'}
d3={'name':'lisi','age':22,'country':'JP'}
d4={'name':'zhaoliu','age':22,'country':'USA'}
d5={'name':'pengqi','age':22,'country':'USA'}
d6={'name':'lijiu','age':22,'country':'China'}
lst=[d1,d2,d3,d4,d5,d6]
# [{'name': 'zhangsan', 'age': 20, 'country': 'China'},
# {'name': 'wangwu', 'age': 19, 'country': 'USA'},
# {'name': 'lisi', 'age': 22, 'country': 'JP'},
# {'name': 'zhaoliu', 'age': 22, 'country': 'USA'},
# {'name': 'pengqi', 'age': 22, 'country': 'USA'},
# {'name': 'lijiu', 'age': 22, 'country': 'China'}]

lst.sort(key=itemgetter('age')) #需要先排序,然后才能groupby。lst排序后自身被改变
print(lst)
# [{'name': 'wangwu', 'age': 19, 'country': 'USA'},
# {'name': 'zhangsan', 'age': 20, 'country': 'China'},
# {'name': 'lisi', 'age': 22, 'country': 'JP'},
# {'name': 'zhaoliu', 'age': 22, 'country': 'USA'},
# {'name': 'pengqi', 'age': 22, 'country': 'USA'},
# {'name': 'lijiu', 'age': 22, 'country': 'China'}]

lstg = groupby(lst,itemgetter('country'))

for key,group in lstg:
print (key,list(g)) # group是一个迭代器,包含了所有的分组列表
# 19 [{'name': 'wangwu', 'age': 19, 'country': 'USA'}]
# 20 [{'name': 'zhangsan', 'age': 20, 'country': 'China'}]
# 22 [{'name': 'lisi', 'age': 22, 'country': 'JP'}, {'name': 'zhaoliu', 'age': 22, 'country': 'USA'}, {'name': 'pengqi', 'age': 22, 'country': 'USA'}, {'name': 'lijiu', 'age': 22, 'country': 'China'}]

urljoin

通过举例查看规律,第一个参数为base,将至少保留至.com之前(若url不为空则base部分以/结尾),根据第二个参数url来填补,若url部分不包含/则直接替换或者补充至base后续,若url部分包含/则直接填补至base后续。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from urllib.parse import urljoin

print(urljoin("http://www.chachabei.com/folder/currentpage.html", "anotherpage.html"))
# 'http://www.chachabei.com/folder/anotherpage.html'
print(urljoin("http://www.chachabei.com/folder/currentpage.html", "/anotherpage.html"))
# 'http://www.chachabei.com/anotherpage.html'
print(urljoin("http://www.chachabei.com/folder/currentpage.html", "folder2/anotherpage.html"))
# 'http://www.chachabei.com/folder/folder2/anotherpage.html'
print(urljoin("http://www.chachabei.com/folder/currentpage.html", "/folder2/anotherpage.html"))
# 'http://www.chachabei.com/folder2/anotherpage.html'
print(urljoin("http://www.chachabei.com/abc/folder/currentpage.html", "/folder2/anotherpage.html"))
# 'http://www.chachabei.com/folder2/anotherpage.html'
print(urljoin("http://www.chachabei.com/abc/folder/currentpage.html", "../anotherpage.html"))
# 'http://www.chachabei.com/abc/anotherpage.html'
print(urljoin("", "../anotherpage.html"))
# anotherpage.html

importlib

通过字符串名导入模块

1
2
3
4
5
6
import importlib
math = importlib.import_module('math')
math.sin(2)
# 0.9092974268256817
mod = importlib.import_module('urllib.request')
u = mod.urlopen('http://www.baidu.com')

import_module只是简单地执行和import相同的步骤,但是返回生成的模块对象。你只需要将其存储在一个变量,然后像正常的模块一样使用。

相对导入:

1
2
3
import importlib
# Same as 'from . import b'
b = importlib.import_module('.b', __package__)

@property

1
2
3
4
5
6
7
8
9
class Student(object):    
def get_score(self):        
return self._score    
def set_score(self, value):        
if not isinstance(value, int):            
raise ValueError('score must be an integer!')        
if value < 0 or value > 100:            
raise ValueError('score must between 0 ~ 100!')        
self._score = value
1
2
3
s = Student()
s.set_score(60)
s.get_score()  # 60

使用@property,可作为属性而不需要加( )以方法形式返回,类似于对象的字段

1
2
3
4
5
6
7
8
9
10
11
class Student(object):    
@property    
def score(self):        
return self._score    
@score.setter    
def score(self, value):        
if not isinstance(value, int):            
raise ValueError('score must be an integer!')        
if value < 0 or value > 100:            
raise ValueError('score must between 0 ~ 100!')        
self._score = value

进程、线程、协程

进程(Process)**是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。

线程,被称为轻量级进程(Lightweight Process,LWP),是程序执行流的最小单元。

协程:一个程序可以包含多个协程,可以对比于一个进程包含多个线程,因而下面我们来比较协程和线程:我们知道多个线程相对独立,有自己的上下文,切换受系统控制;而协程也相对独立,有自己的上下文,但是其切换由自己控制,由当前协程切换到其他协程由当前协程来控制。

守护线程与用户线程

用户线程:我们平常创建的普通线程。

守护线程:用来服务于用户线程;不需要上层逻辑介入。

在JAVA中,当线程只剩下守护线程的时候,JVM就会退出;补充一点如果还有其他的任意一个用户线程还在,JVM就不会退出。

eg.多线程

听一首音乐假如耗时1秒,看一部电影假如耗时5秒,用两个函数定义这两个任务如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import time
from functools import wraps

def fn_timer(function):
'''
函数计时装饰器
:param function: 函数对象
:return: 装饰器
'''
@wraps(function)
def function_timer(*args,**kwargs):
# 起始时间
t0 = time.time()
# 调用函数
result = function(*args,**kwargs)
# 结束时间
t1 = time.time()
# 打印函数耗时
print ('[finished function:{func_name} in {time:.2f}s]'.format(func_name = function.__name__,time = t1 - t0))
return result
return function_timer

# 耗时任务:听音乐
def music(name):
print 'I am listening to music {0}'.format(name)
time.sleep(1)

# 耗时任务:看电影
def movie(name):
print 'I am watching movie {0}'.format(name)
time.sleep(5)

方案一:先一个个听完10首音乐,再一个个看完2部电影,顺序完成,代码如下:

1
2
3
4
5
6
7
# 单线程操作:顺序执行听10首音乐,看2部电影
@fn_timer
def single_thread():
for i in range(10):
music(i)
for i in range(2):
movie(i)

让我们执行一下这段代码,输出如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
I am listening to music 0
I am listening to music 1
I am listening to music 2
I am listening to music 3
I am listening to music 4
I am listening to music 5
I am listening to music 6
I am listening to music 7
I am listening to music 8
I am listening to music 9
I am watching movie 0
I am watching movie 1
[finished function:single_thread in 20.14s]

方案二:同时听多首音乐,同时看多部电影进行,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import threading

# 多线程执行:听10首音乐,看2部电影
@fn_timer
def multi_thread():
# 线程列表
threads = []
for i in range(10):
# 创建一个线程,target参数为任务处理函数,args为任务处理函数所需的参数元组
threads.append(threading.Thread(target = music,args = (i,)))
for i in range(2):
threads.append(threading.Thread(target = movie,args = (i,)))

for t in threads:
# 设为守护线程
t.setDaemon(True)
# 开始线程
t.start()
for t in threads:
t.join()

multi_thread()

执行上述代码,运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
I am listening to music 0
I am listening to music 1
I am listening to music 2
I am listening to music 3
I am listening to music 4
I am listening to music 5
I am listening to music 6
I am listening to music 7
I am listening to music 8
I am listening to music 9
I am watching movie 0
I am watching movie 1
[finished function:multi_thread in 5.02s]

这次只用了5秒就完成了,完成效率显著提升。这次试用多线程执行多个任务,所有任务最终的总耗时 = 耗时最长的那个单个任务的耗时,即看一部电影的5秒钟时间。

方案三:使用线程池。上面使用多线程的方式比较繁琐,下面使用线程池来实现:

1
2
3
4
5
6
7
8
9
10
11
12
# 使用线程池执行:听10首音乐,看2部电影
from multiprocessing import Pool
@fn_timer
def use_pool():
# 设置线程池大小为20,如果不设置,默认值是CPU核心数
pool = Pool(20)
pool.map(movie,range(2))
pool.map(music,range(10))
pool.close()
pool.join()

use_pool()

执行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
I am listening to music 0
I am listening to music 1
I am listening to music 2
I am listening to music 3
I am listening to music 4
I am listening to music 5
I am listening to music 6
I am listening to music 7
I am listening to music 8
I am listening to music 9
I am watching movie 0
I am watching movie 1
[finished function:use_pool in 6.12s]

可以看出使用线程池反而比手工调度线程多耗时一秒钟,可能是因为线程池内部对线程的调度和线程切换的耗时造成的。

eg. 多进程和进程池的使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# coding:utf-8
# 测试多进程
import os
import time
from multiprocessing import Process,Pool

from functools import wraps

def fn_timer(function):
'''
函数计时装饰器
:param function: 函数对象
:return: 装饰器
'''
@wraps(function)
def function_timer(*args,**kwargs):
# 起始时间
t0 = time.time()
# 调用函数
result = function(*args,**kwargs)
# 结束时间
t1 = time.time()
# 打印函数耗时
print ('[finished function:{func_name} in {time:.2f}s]'.format(func_name = function.__name__,time = t1 - t0))
return result
return function_timer

# 简单的任务
@fn_timer
def do_simple_task(task_name):
print ('Run child process {0}, task name is: {1}'.format(os.getpid(),task_name))
time.sleep(1.2)
return task_name

@fn_timer
# 1. 测试简单的多进程
# 每一个进程占据一个CPU,所以要建立的进程必须小于等于CPU的个数。如果启动进程数过多,特别是当遇到CPU密集型任务,会降低并行的效率。
def test_simple_multi_process():
p1 = Process(target=do_simple_task, args=('task1',))
p2 = Process(target=do_simple_task, args=('task2',))
print ('Process will start...')
p1.start()
p2.start()
p1.join()
p2.join()
print ('Process end.')

@fn_timer
# 2. 测试使用进程池
def test_use_process_pool():
# 创建一个进程池,数字表示一次性同时执行的最大子进程数
pool = Pool(5)
# 任务名称列表
task_names = []
for i in range(7):
# 因为进程池最大子进程数为5,所以会被分成2批并行运行进程
task_names.append('task{0}'.format(i))
# 并发执行多个任务,并获取任务返回值
results = pool.map_async(do_simple_task,task_names)
print ('Many processes will start...')
pool.close()
pool.join()
print ('All processes end, results is: {0}'.format(results.get()))

def main():
test_simple_multi_process()
# 输出:
'''
Process will start...
Run child process 45824, task name is: task1
Run child process 45825, task name is: task2
[finished function:do_simple_task in 1.20s]
[finished function:do_simple_task in 1.20s]
Process end.
[finished function:test_simple_multi_process in 1.21s]
'''

test_use_process_pool()
# 输出:
'''
Many processes will start...
Run child process 45826, task name is: task0
Run child process 45827, task name is: task1
Run child process 45828, task name is: task2
Run child process 45829, task name is: task3
Run child process 45830, task name is: task4
[finished function:do_simple_task in 1.20s]
[finished function:do_simple_task in 1.20s]
Run child process 45826, task name is: task5
[finished function:do_simple_task in 1.20s]
[finished function:do_simple_task in 1.20s]
Run child process 45827, task name is: task6
[finished function:do_simple_task in 1.20s]
[finished function:do_simple_task in 1.20s]
[finished function:do_simple_task in 1.20s]
All processes end, results is: ['task0', 'task1', 'task2', 'task3', 'task4', 'task5', 'task6']
[finished function:test_use_process_pool in 2.52s]
'''
if __name__ == '__main__':
main()

进程之间的通信

进程间的通信采用队列来实现,实例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# coding:utf-8
# 测试进程间的通信
import time
from multiprocessing import Process,Queue

import random
from functools import wraps

def fn_timer(function):
'''
函数计时装饰器
:param function: 函数对象
:return: 装饰器
'''
@wraps(function)
def function_timer(*args,**kwargs):
# 起始时间
t0 = time.time()
# 调用函数
result = function(*args,**kwargs)
# 结束时间
t1 = time.time()
# 打印函数耗时
print ('[finished function:{func_name} in {time:.2f}s]'.format(func_name = function.__name__,time = t1 - t0))
return result
return function_timer


# 写进程执行的任务
def write(q):
for value in ['A','B','C']:
print ('Put value: {0} to queue.'.format(value))
q.put(value)
time.sleep(random.random())

# 读进程执行的任务
def read(q):
while True:
value = q.get(True)
print ('Get value: {0} from queue.'.format(value))

# 测试进程间的通信
def test_communication_between_process():
q = Queue()
# 写进程
pw = Process(target = write,args = (q,))
# 读进程
pr = Process(target = read,args = (q,))
pw.start()
pr.start()
pw.join()
# 因为读任务是死循环,所以要强行结束
pr.terminate()

def main():
test_communication_between_process()
# 输出
'''
Put value: A to queue.
Get value: A from queue.
Put value: B to queue.
Get value: B from queue.
Put value: C to queue.
Get value: C from queue.
'''

if __name__ == '__main__':
main()

协程

Python通过yield提供了对协程的基本支持,但是不完全。而第三方的gevent为Python提供了比较完善的协程支持。gevent是第三方库,通过greenlet实现协程,其基本思想是:

当一个greenlet遇到IO操作时,比如访问网络,就自动切换到其他的greenlet,等到IO操作完成,再在适当的时候切换回来继续执行。由于IO操作非常耗时,经常使程序处于等待状态,有了gevent为我们自动切换协程,就保证总有greenlet在运行,而不是等待IO。

用协程下载同样的20个网页,实例代码如下:

1
pip install gevent
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# coding:utf-8
# 测试协程
import time
import gevent
from gevent.pool import Pool
from gevent import monkey
from functools import wraps

def fn_timer(function):
'''
函数计时装饰器
:param function: 函数对象
:return: 装饰器
'''
@wraps(function)
def function_timer(*args,**kwargs):
# 起始时间
t0 = time.time()
# 调用函数
result = function(*args,**kwargs)
# 结束时间
t1 = time.time()
# 打印函数耗时
print ('[finished function:{func_name} in {time:.2f}s]'.format(func_name = function.__name__,time = t1 - t0))
return result
return function_timer


# 打动态补丁,把标准库中的thread/socket等替换掉,让它们变成非阻塞的
monkey.patch_all()

@fn_timer
def download_using_single_thread(urls):
'''
顺序执行下载多个网页
:param urls: 要下载的网页内容
:return: 响应列表
'''
resps = []
for index, url in enumerate(urls):
time.sleep(1)
print(f'下载第{index+1}个页面:{url}')
return resps

@fn_timer
def download_using_coroutine(urls):
'''
使用协程下载
:param urls: 要下载的网页内容
:return: 响应列表
'''
spawns = []
for url in urls:
spawns.append(gevent.spawn(download_using_single_thread,[url]))
# 在遇到IO操作时,gevent会自动切换,并发执行(异步IO)
rets = gevent.joinall(spawns)
# joinall函数会返回gevent.greenlet.Greenlet对象的列表,如果想要获得每次调用session.get函数的返回结果,还需分别调用每个Greenlet对象的get函数
results = [ret.get() for ret in rets]

@fn_timer
def download_using_coroutine_pool(urls):
# 创建协程池,并设置最大并发量
pool = Pool(4)
# pool.map函数直接返回每次调用session.get函数返回的结果列表
urls = [[url] for url in urls]
rets = pool.map(download_using_single_thread,urls)

def main():
# 1.使用单线程下载4个网页
urls = [f'{i}.html' for i in range(1,5)]
# download_using_single_thread(urls=urls)
# 输出:
'''
下载第1个页面:1.html
下载第2个页面:2.html
下载第3个页面:3.html
下载第4个页面:4.html
[finished function:download_using_single_thread in 4.02s]
'''

# 2.使用协程下载4个网页
# download_using_coroutine(urls=urls)
# 输出:
'''
下载第1个页面:1.html
[finished function:download_using_single_thread in 1.00s]
下载第1个页面:2.html
[finished function:download_using_single_thread in 1.00s]
下载第1个页面:3.html
[finished function:download_using_single_thread in 1.00s]
下载第1个页面:4.html
[finished function:download_using_single_thread in 1.00s]
[finished function:download_using_coroutine in 1.00s]
'''

# 3.使用协程池下载4个网页
download_using_coroutine_pool(urls=urls)
# 输出:
'''
下载第1个页面:1.html
[finished function:download_using_single_thread in 1.00s]
下载第1个页面:2.html
[finished function:download_using_single_thread in 1.00s]
下载第1个页面:3.html
[finished function:download_using_single_thread in 1.00s]
下载第1个页面:4.html
[finished function:download_using_single_thread in 1.00s]
[finished function:download_using_coroutine_pool in 1.00s]
'''

if __name__ == '__main__':
main()

多线程、多进程和协程并发效率的对比

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# coding:utf-8
# 对比多线程、多进程和协程下载网页
from multiprocessing.dummy import Pool as thread_pool
import time
from multiprocessing import Pool as process_pool
from gevent.pool import Pool
from gevent import monkey
from functools import wraps


def fn_timer(function):
'''
函数计时装饰器
:param function: 函数对象
:return: 装饰器
'''
@wraps(function)
def function_timer(*args,**kwargs):
# 起始时间
t0 = time.time()
# 调用函数
result = function(*args,**kwargs)
# 结束时间
t1 = time.time()
# 打印函数耗时
print ('[finished function:{func_name} in {time:.2f}s]'.format(func_name = function.__name__,time = t1 - t0))
return result
return function_timer


# 打动态补丁,把标准库中的thread/socket等替换掉,让它们变成非阻塞的
monkey.patch_all()

@fn_timer
def download_using_single_thread(urls):
'''
顺序执行下载多个网页
:param urls: 要下载的网页内容
:return: 响应列表
'''
resps = []
for index, url in enumerate(urls):
time.sleep(1)
print(f'下载第{index+1}个页面:{url}')
return resps

# 1. 使用线程池下载多个网页的内容
@fn_timer
def download_using_thread_pool(urls):
pool = thread_pool(4)
# 第一个参数为函数名,第二个参数一个可迭代对象,为函数所需的参数列表
resps = pool.map(download_using_single_thread,[urls])
pool.close()
return resps

# 2. 测试使用进程池
@fn_timer
def download_using_process_pool(urls):
# 创建一个进程池,数字表示一次性同时执行的最大子进程数
pool = process_pool(4)
# 并发执行多个任务,并获取任务返回值
results = pool.map_async(download_using_single_thread,[urls])
pool.close()
return results.get()

# 3. 使用协程池下载
@fn_timer
def download_using_coroutine_pool(urls):
# 创建协程池,并设置最大并发量
pool = Pool(4)
# pool.map函数直接返回每次调用session.get函数返回的结果列表
urls = [[url] for url in urls]
rets = pool.map(download_using_single_thread,urls)

def main():
urls = [f'{i}.html' for i in range(1, 5)]
# 1. 使用线程池下载4个网页
# download_using_thread_pool(urls=urls)
# 输出:
'''
下载第1个页面:1.html
下载第2个页面:2.html
下载第3个页面:3.html
下载第4个页面:4.html
[finished function:download_using_single_thread in 4.01s]
[finished function:download_using_thread_pool in 4.01s]
'''

# 2. 使用进程池下载4个网页
download_using_process_pool(urls=urls)
# 输出:
'''
代码可能存在问题,需要 KeyboardInterrupt 才运行
下载第1个页面:1.html
下载第2个页面:2.html
下载第3个页面:3.html
下载第4个页面:4.html
[finished function:download_using_single_thread in 4.00s]
'''

# 3.使用协程池下载4个网页
# download_using_coroutine_pool(urls=urls)
# 输出:
'''
下载第1个页面:1.html
[finished function:download_using_single_thread in 1.00s]
下载第1个页面:2.html
[finished function:download_using_single_thread in 1.00s]
下载第1个页面:3.html
[finished function:download_using_single_thread in 1.00s]
下载第1个页面:4.html
[finished function:download_using_single_thread in 1.00s]
[finished function:download_using_coroutine_pool in 1.00s]
'''

if __name__ == '__main__':
main()

从结果来看,使用协程池的效率还是略高一点。