【翻译】从Python到Numpy

http://www.labri.fr/perso/nrougier/from-python-to-numpy/

Copyright (c) 2017 - Nicolas P. Rougier Nicolas.Rougier@inria.fr

关于Numpy已有相当多的书籍(参见参考文献),一个合理的问题是再写一本书是否有必要。正如您可能已经通过阅读这些内容所猜测的那样,我个人的回答是肯定的,主要是因为我认为有一种不同的方法可以通过矢量化专注于从Python迁移到Numpy。你在书中找不到很多技巧,这些技巧大多是通过经验学习的。本书的目标是解释其中的一些技术,并提供在此过程中获得此体验的机会。

http://www.labri.fr/perso/nrougier/from-python-to-numpy

前言

关于作者

Nicolas P. RougierInria的全职研究科学家,Inria是法国国家计算机科学与控制研究所。这是一个由研究和教育部以及经济财政和工业部双重监督下的公共科技机构(EPST)。Nicolas P. Rougier正在Mnemosyne项目中工作,该项目位于综合计算神经科学与神经退行性疾病研究所,波尔多计算机科学研究实验室(LaBRI),波尔多大学和国家科学研究中心(CNRS)。

他已经使用Python超过15年,并且在神经科学,机器学习和高级可视化(OpenGL)建模方面已经超过10年了。Nicolas P. Rougier是几本在线资源和教程(Matplotlib,numpy,OpenGL)的作者,他在波尔多大学以及世界各地的各种会议和学校(SciPy,EuroScipy等)教授Python,numpy和科学可视化。他还是热门文章Ten Simple Rules for Better Figures和一个流行的matplotlib教程的作者。

关于这本书

本书以restructured text格式编写,并使用docutils python包中rst2html.py提供的命令行生成 。

如果要重建html输出,请从顶部目录中键入:

1
2
3
$ rst2html.py --link-stylesheet --cloak-email-addresses \
--toc-top-backlinks --stylesheet = book.css \
--stylesheet-dirs=. book.rst book.html

这些来源可从https://github.com/rougier/from-python-to-numpy获得。

先决条件

这不是Python初学者指南,你应该达到Python中级水平并且最好是具有numpy初学者水平。如果没有达到这个要求,请查看参考文献获取帮助。

约定

我们将使用以下通用的命名方式。如果没有明确说明,每个脚本都应该导入numpy,scipy和matplotlib:

1
2
3
import  numpy  as  np 
import scipy as sp
import matplotlib.pyplot as plt

我们使用的软件版本如下(截至2017年1月的最新版本):

Packages Version
Python 3.6.0
Numpy 1.12.0
Scipy 0.18.1
Matplotlib 2.0.0

如何贡献

如果您想为本书做出贡献,您可以:

出版

如果您是有兴趣发布本书的编辑,如果您同意此版本和所有后续版本开放访问(即在此地址在线),您可以与我联系,您知道如何处理重组文本(Word不是选项),你提供真正的附加值和支持服务,更重要的是,你有一个真正神奇的乳胶书模板(并警告我有点挑剔排版和设计:Edward Tufte是我的英雄) 。还在?

License

本作品采用知识共享署名 - 非商业性共享4.0国际许可协议授权。你可以自由地:

  • 共享 - 以任何媒体或格式复制和重新分发材料
  • 适应 - 重新混合,转换和构建材料

只要您遵守许可条款,许可人就不能撤销这些自由。

代码

该代码根据OSI批准的BSD 2条款许可进行许可。

介绍

简单的例子

注意:您可以使用常规python shell或从IPython会话或Jupyter笔记本中执行以下代码文件夹中的任何代码。在这种情况下,您可能希望使用magic命令%timeit而不是我编写的自定义命令。

Numpy就是矢量化。如果您熟悉Python,这是您将面临的主要困难,因为您需要改变您的思维方式,并适应那些叫做“向量(vectors)”,“数组(arrays)”,“视图(views)”或“ufuncs”的新朋友。

我们来看一个非常简单的例子,random walk。一种可行的基于面向对象的方法是定义一个RandomWalker类并编写一个walk方法,该方法将在每个(随机)步骤之后返回当前位置。虽然这么写的可读性很好,但执行很慢:

基于面向对象的实现

1
2
3
4
5
6
7
8
9
10
11
12
class RandomWalker:
def __init__(self):
self.position = 0

def walk(self, n):
self.position = 0
for i in range(n):
yield self.position
self.position += 2*random.randint(0, 1) - 1

walker = RandomWalker()
walk = [position for position in walker.walk(1000)]

基准测试的输出:

1
2
3
4
>>> from tools import timeit
>>> walker = RandomWalker()
>>> timeit("[position for position in walker.walk(n=10000)]", globals())
10 loops, best of 3: 15.7 msec per loop

程序化的实现

对于这样一个简单的问题,我们可以保留类的定义,并且只关注在每个随机步骤之后计算连续位置的walk方法。

1
2
3
4
5
6
7
8
9
def random_walk(n):
position = 0
walk = [position]
for i in range(n):
position += 2*random.randint(0, 1)-1
walk.append(position)
return walk

walk = random_walk(1000)

这个新方法节省了一些CPU周期但不是那么多,因为这个函数与面向对象的方法几乎相同,我们节省的几个周期可能来自内部Python面向对象的机制。

1
2
3
>>> from tools import timeit
>>> timeit("random_walk(n=10000)", globals())
10 loops, best of 3: 15.6 msec per loop

矢量化实现

但是我们可以使用Python的itertools模块来做得更好,它提供了一组函数来创建迭代器以实现高效循环。如果我们观察到随机游走是步骤的累积,我们可以通过首先生成所有步骤并在没有任何循环的情况下累积它们来重写函数:

1
2
3
4
5
6
7
def random_walk_faster(n=1000):
from itertools import accumulate
# Only available from Python 3.6
steps = random.choices([-1,+1], k=n)
return [0]+list(accumulate(steps))

walk = random_walk_faster(1000)

事实上,我们只是矢量化了我们的功能。我们首先生成所有步骤并使用accumulate函数计算所有位置,而不是循环选择顺序步骤并将它们添加到当前位置。我们摆脱了循环,这使事情变得更快:

1
2
3
>>> from tools import timeit
>>> timeit("random_walk_faster(n=10000)", globals())
10 loops, best of 3: 2.21 msec per loop

与之前的版本相比,我们节省了85%的计算时间,看起来不错。但下面这个新版本的优势在于它使numpy矢量化变得非常简单。我们只需将itertools调用转换为numpy调用即可。

1
2
3
4
5
6
def random_walk_fastest(n=1000):
# No 's' in numpy choice (Python offers choice & choices)
steps = np.random.choice([-1,+1], n)
return np.cumsum(steps)

walk = random_walk_fastest(1000)

这么写并不难,但我们使用numpy却获得了500倍的性能提升:

1
2
3
>>> from tools import timeit
>>> timeit("random_walk_fastest(n=10000)", globals())
1000 loops, best of 3: 14 usec per loop

本书是关于矢量化的,无论是在代码还是问题层面。在查看自定义矢量化之前,我们会看到这种差异很重要。

可读性 vs 速度

在进入下一章之前,我想提醒您一旦熟悉numpy就可能遇到的潜在问题。它是一个非常强大的库,你可以用它来创造奇迹,但大多数时候,这是以可读性为代价的。如果您在撰写本文时没有对您的代码写注释,那么在几周(或可能是几天)之后,您将无法确定某项功能的作用。例如,你能说出下面这两个函数在做什么吗?可能你可以告诉第一个,但第二个不太可能(如果你是JaimeFernándezdelRío ,那么你不需要阅读这本书)。

1
2
3
4
5
6
7
8
9
def function_1(seq, sub):
return [i for i in range(len(seq) - len(sub)) if seq[i:i+len(sub)] == sub]

def function_2(seq, sub):
target = np.dot(sub, sub)
candidates = np.where(np.correlate(seq, sub, mode='valid') == target)[0]
check = candidates[:, np.newaxis] + np.arange(len(sub))
mask = np.all((np.take(seq, check) == sub), axis=-1)
return candidates[mask]

您可能已经猜到,第二个函数是第一个函数的矢量化的、优化的、更快的numpy版本。它比纯Python版本快10倍,但它几乎没有可读性。

剖析一个array

介绍

正如前言中所解释的那样,在阅读本书之前您应该对numpy有一个基本的体验。如果不是这种情况,那么在返回此处之前,最好先从初学者教程开始。因此,我将在此对numpy array进行剖析,特别是关于内存布局,视图,副本和数据类型。如果你希望你写的计算能够从numpy哲学中受益,那么这些就是理解它们的关键概念。

让我们考虑一个简单的例子,我们想要从具有dtype的数组中清除所有值np.float32。如何写出运行速度最快的实现呢?下面的代码相当明显(至少对那些熟悉numpy的人来说),但上面的问题要求找到最快的实现。

1
2
>>> Z = np.ones(4*1000000, np.float32)
>>> Z[...] = 0

如果仔细观察数组的dtype和大小,可以观察到这个数组可以被转换(即查看)到许多其他“兼容”数据类型中。通过兼容,我的意思是Z.size * Z.itemsize可以除以新的dtype项目大小。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
>>> timeit("Z.view(np.float16)[...] = 0", globals())
100 loops, best of 3: 2.72 msec per loop
>>> timeit("Z.view(np.int16)[...] = 0", globals())
100 loops, best of 3: 2.77 msec per loop
>>> timeit("Z.view(np.int32)[...] = 0", globals())
100 loops, best of 3: 1.29 msec per loop
>>> timeit("Z.view(np.float32)[...] = 0", globals())
100 loops, best of 3: 1.33 msec per loop
>>> timeit("Z.view(np.int64)[...] = 0", globals())
100 loops, best of 3: 874 usec per loop
>>> timeit("Z.view(np.float64)[...] = 0", globals())
100 loops, best of 3: 865 usec per loop
>>> timeit("Z.view(np.complex128)[...] = 0", globals())
100 loops, best of 3: 841 usec per loop
>>> timeit("Z.view(np.int8)[...] = 0", globals())
100 loops, best of 3: 630 usec per loop

有趣的是,清除所有值的明显方法并不是最快的。通过将阵列转换为更大的数据类型np.float64,我们获得了25%的速度因子。但是,通过将数组视为字节数组(np.int8),我们获得了50%的因子。这种加速的原因可以在numpy内部运作原理和编译器优化中找到。这个简单的例子说明了一个完美将在下一节中看到的numpy哲学。

内存布局

numpy文档中对ndarray的描述很清楚:

An instance of class ndarray consists of a contiguous one-dimensional segment of computer memory (owned by the array, or by some other object), combined with an indexing scheme that maps N integers into the location of an item in the block.

ndarray类的一个实例由一个连续的一维内存段(由数组或某个其他对象拥有)和一个索引方案组成,该索引方案将N个整数映射到块中一个项的位置。

换句话说,array主要是连续的存储器块,其部分可以使用索引方案来访问。这种索引方案又由shapedata type定义,这正是您定义新数组时所需要的:

1
Z = np.arange(9).reshape(3,3).astype(np.int16)

在这里,我们知道Z itemsize是2个字节(int16),shape是(3,3),dimensions是2(len(Z.shape))。

1
2
3
4
5
6
>>> print(Z.itemsize)
2
>>> print(Z.shape)
(3, 3)
>>> print(Z.ndim)
2

此外,由于Z不是视图,我们可以推导出数组的步幅,它定义了遍历数组时每个维度中要步进的字节数。

1
2
3
4
5
>>> strides = Z.shape[1]*Z.itemsize, Z.itemsize
>>> print(strides)
(6, 2)
>>> print(Z.strides)
(6, 2)

通过所有这些信息,我们知道如何访问特定item(由索引元组设计),更准确地说,如何计算开始和结束偏移:

1
2
3
4
offset_start = 0
for i in range(ndim):
offset_start += strides[i]*index[i]
offset_end = offset_start + Z.itemsize

让我们看看使用tobytes 转换方法是否正确:

1
2
3
4
5
6
7
8
9
>>> Z = np.arange(9).reshape(3,3).astype(np.int16)
>>> index = 1,1
>>> print(Z[index].tobytes())
b'\x04\x00'
>>> offset = 0
>>> for i in range(Z.ndim):
... offset + = Z.strides[i]*index[i]
>>> print(Z.tobytes()[offset_start:offset_end]
b'\x04\x00'

实际上,可以从不同的角度(即布局)观察该array:

Item layout

1
2
3
4
5
6
7
8
9
10
11
               shape[1]
(=3)
┌───────────┐

┌ ┌───┬───┬───┐ ┐
│ │ 0 │ 1 │ 2 │ │
│ ├───┼───┼───┤ │
shape[0] │ │ 3 │ 4 │ 5 │ │ len(Z)
(=3) │ ├───┼───┼───┤ │ (=3)
│ │ 6 │ 7 │ 8 │ │
└ └───┴───┴───┘ ┘

展开的item layout

1
2
3
4
5
6
7
┌───┬───┬───┬───┬───┬───┬───┬───┬───┐
│ 0 │ 1 │ 2 │ 3 │ 4 │ 5 │ 6 │ 7 │ 8 │
└───┴───┴───┴───┴───┴───┴───┴───┴───┘

└───────────────────────────────────┘
Z.size
(=9)

内存布局(C顺序,大端序)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
                         strides[1]
(=2)
┌─────────────────────┐

┌ ┌──────────┬──────────┐ ┐
│ p+00: │ 00000000 │ 00000000 │ │
│ ├──────────┼──────────┤ │
│ p+02: │ 00000000 │ 00000001 │ │ strides[0]
│ ├──────────┼──────────┤ │ (=2x3)
│ p+04 │ 00000000 │ 00000010 │ │
│ ├──────────┼──────────┤ ┘
│ p+06 │ 00000000 │ 00000011 │
│ ├──────────┼──────────┤
Z.nbytes │ p+08: │ 00000000 │ 00000100 │
(=3x3x2) │ ├──────────┼──────────┤
│ p+10: │ 00000000 │ 00000101 │
│ ├──────────┼──────────┤
│ p+12: │ 00000000 │ 00000110 │
│ ├──────────┼──────────┤
│ p+14: │ 00000000 │ 00000111 │
│ ├──────────┼──────────┤
│ p+16: │ 00000000 │ 00001000 │
└ └──────────┴──────────┘

└─────────────────────┘
Z.itemsize
Z.dtype.itemsize
(=2)

如果我们现在取一个Z的切片,结果是Z的基本数组的一个视图:

1
V = Z[::2,::2]

这样的视图是由shape,dtype 和strides指定的,因为strides不能仅仅从dtype和shape中推断出来:

Item layout

1
2
3
4
5
6
7
8
9
10
11
               shape[1]
(=2)
┌───────────┐

┌ ┌───┬╌╌╌┬───┐ ┐
│ │ 0 │ │ 2 │ │ ┌───┬───┐
│ ├───┼╌╌╌┼───┤ │ │ 0 │ 2 │
shape[0] │ ╎ ╎ ╎ ╎ │ len(Z) → ├───┼───┤
(=2) │ ├───┼╌╌╌┼───┤ │ (=2) │ 6 │ 8 │
│ │ 6 │ │ 8 │ │ └───┴───┘
└ └───┴╌╌╌┴───┘ ┘

展开的item layout

1
2
3
4
5
6
7
8
┌───┬╌╌╌┬───┬╌╌╌┬╌╌╌┬╌╌╌┬───┬╌╌╌┬───┐       ┌───┬───┬───┬───┐
│ 0 │ │ 2 │ ╎ ╎ │ 6 │ │ 8 │ → │ 0 │ 2 │ 6 │ 8 │
└───┴╌╌╌┴───┴╌╌╌┴╌╌╌┴╌╌╌┴───┴╌╌╌┴───┘ └───┴───┴───┴───┘
└─┬─┘ └─┬─┘ └─┬─┘ └─┬─┘
└───┬───┘ └───┬───┘
└───────────┬───────────┘
Z.size
(=4)

内存布局(C顺序,大端序)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
              ┌        ┌──────────┬──────────┐ ┐             ┐
┌─┤ p+00: │ 00000000 │ 00000000 │ │ │
│ └ ├──────────┼──────────┤ │ strides[1] │
┌─┤ p+02: │ │ │ │ (=4) │
│ │ ┌ ├──────────┼──────────┤ ┘ │
│ └─┤ p+04 │ 00000000 │ 00000010 │ │
│ └ ├──────────┼──────────┤ │ strides[0]
│ p+06: │ │ │ │ (=12)
│ ├──────────┼──────────┤ │
Z.nbytes ─┤ p+08: │ │ │ │
(=8) │ ├──────────┼──────────┤ │
│ p+10: │ │ │ │
│ ┌ ├──────────┼──────────┤ ┘
│ ┌─┤ p+12: │ 00000000 │ 00000110 │
│ │ └ ├──────────┼──────────┤
└─┤ p+14: │ │ │
│ ┌ ├──────────┼──────────┤
└─┤ p+16: │ 00000000 │ 00001000 │
└ └──────────┴──────────┘

└─────────────────────┘
Z.itemsize
Z.dtype.itemsize
(=2)

视图和副本

视图和副本是优化数值计算的重要概念。即使我们已经在前一节中操纵了它们,整个故事还是有点复杂。

直接和间接访问

首先,我们必须区分索引花式索引。第一个将始终返回视图,而第二个将返回一个副本。这种差异很重要,因为在第一种情况下,修改视图会修改基本数组,而在第二种情况下则不是这样:

1
2
3
4
5
6
7
8
9
10
>>> Z = np.zeros(9)
>>> Z_view = Z[:3]
>>> Z_view[...] = 1
>>> print(Z)
[ 1. 1. 1. 0. 0. 0. 0. 0. 0.]
>>> Z = np.zeros(9)
>>> Z_copy = Z[[0,1,2]]
>>> Z_copy[...] = 1
>>> print(Z)
[ 0. 0. 0. 0. 0. 0. 0. 0. 0.]

因此,如果你需要花式索引,最好保留你的花式索引的副本(特别是如果计算它很复杂)并使用它:

1
2
3
4
5
>>> Z = np.zeros(9)
>>> index = [0,1,2]
>>> Z[index] = 1
>>> print(Z)
[ 1. 1. 1. 0. 0. 0. 0. 0. 0.]

如果您不确定索引的结果是视图还是副本,则可以检查base的结果。如果是None,那么结果就是副本:

1
2
3
4
5
6
7
8
9
10
11
>>> Z = np.random.uniform(0,1,(5,5))
>>> Z1 = Z[:3,:]
>>> Z2 = Z[[0,1,2], :]
>>> print(np.allclose(Z1,Z2))
True
>>> print(Z1.base is Z)
True
>>> print(Z2.base is Z)
False
>>> print(Z2.base is None)
True

请注意,一些numpy函数在可能的情况下返回视图(例如,ravel),而另一些函数总是返回副本(例如,flatten):

1
2
3
4
5
6
7
>>> Z = np.zeros((5,5))
>>> Z.ravel().base is Z
True
>>> Z[::2,::2].ravel().base is Z
False
>>> Z.flatten().base is Z
False

临时副本

副本可以像上一节中那样明确地进行,但最常见的情况是隐式创建中间副本。当您使用数组进行一些算术时就是这种情况:

1
2
3
>>> X = np.ones(10, dtype=np.int)
>>> Y = np.ones(10, dtype=np.int)
>>> A = 2*X + 2*Y

在上面的示例中,已创建三个中间array。一个用于保存2*X的结果,一个用于保存2*Y的结果,最后一个用于保存2*X+2*Y的结果。在这种特定的情况下,array足够小,这并没有真正有所作为。但是,如果您的数组很大,那么您必须小心这些表达式,并想知道您是否可以采用不同的方式。例如,如果只有最终结果才是重要的,并且之后不需要“X”或“Y”,那么另一种解决方案是:

1
2
3
4
5
>>> X = np.ones(10, dtype=np.int)
>>> Y = np.ones(10, dtype=np.int)
>>> np.multiply(X, 2, out=X)
>>> np.multiply(Y, 2, out=Y)
>>> np.add(X, Y, out=X)

使用此替代解决方案,没有创建临时array。问题是,还有许多其他情况需要创建这样的副本,这会影响性能,如下例所示:

1
2
3
4
5
6
7
8
9
10
>>> X = np.ones(1000000000, dtype=np.int)
>>> Y = np.ones(1000000000, dtype=np.int)
>>> timeit("X = X + 2.0*Y", globals())
100 loops, best of 3: 3.61 ms per loop
>>> timeit("X = X + 2*Y", globals())
100 loops, best of 3: 3.47 ms per loop
>>> timeit("X += 2*Y", globals())
100 loops, best of 3: 2.79 ms per loop
>>> timeit("np.add(X, Y, out=X); np.add(X, Y, out=X)", globals())
1000 loops, best of 3: 1.57 ms per loop

结论

最后,我们将做一个练习。给定两个向量Z1和Z2。我们想知道Z2是否是Z1的视图,如果是,这是什么视图?

1
2
>>> Z1 = np.arange(10)
>>> Z2 = Z1[1:-1:2]
1
2
3
4
5
6
   ╌╌╌┬───┬───┬───┬───┬───┬───┬───┬───┬───┬───┬╌╌
Z1 │ 0 │ 1 │ 2 │ 3 │ 4 │ 5 │ 6 │ 7 │ 8 │ 9 │
╌╌╌┴───┴───┴───┴───┴───┴───┴───┴───┴───┴───┴╌╌
╌╌╌╌╌╌╌┬───┬╌╌╌┬───┬╌╌╌┬───┬╌╌╌┬───┬╌╌╌╌╌╌╌╌╌╌
Z2 │ 1 │ │ 3 │ │ 5 │ │ 7 │
╌╌╌╌╌╌╌┴───┴╌╌╌┴───┴╌╌╌┴───┴╌╌╌┴───┴╌╌╌╌╌╌╌╌╌╌

首先,我们需要检查Z1是否是Z2的base

1
2
>>> print(Z2.base is Z1)
True

此时,我们知道Z2是Z1的视图,这意味着Z2可以表示为Z1[start:stop:step]。困难在于找到start,stop和step。对于step,我们可以使用任何数组的step属性,该属性给出每个维度中从一个元素到另一个元素的字节数。在我们的例子中,因为两个数组都是一维的,所以我们只能直接比较第一步:

1
2
3
>>> step = Z2.strides[0] // Z1.strides[0]
>>> print(step)
2

下一个困难是找到start和stop的索引。为此,我们可以利用byte_bounds方法返回指向数组端点的指针。

1
2
3
4
5
6
7
8
9
10
11
  byte_bounds(Z1)[0]                  byte_bounds(Z1)[-1]
↓ ↓
╌╌╌┬───┬───┬───┬───┬───┬───┬───┬───┬───┬───┬╌╌
Z1 │ 0 │ 1 │ 2 │ 3 │ 4 │ 5 │ 6 │ 7 │ 8 │ 9 │
╌╌╌┴───┴───┴───┴───┴───┴───┴───┴───┴───┴───┴╌╌

byte_bounds(Z2)[0] byte_bounds(Z2)[-1]
↓ ↓
╌╌╌╌╌╌╌┬───┬╌╌╌┬───┬╌╌╌┬───┬╌╌╌┬───┬╌╌╌╌╌╌╌╌╌╌
Z2 │ 1 │ │ 3 │ │ 5 │ │ 7 │
╌╌╌╌╌╌╌┴───┴╌╌╌┴───┴╌╌╌┴───┴╌╌╌┴───┴╌╌╌╌╌╌╌╌╌╌
1
2
3
4
5
6
7
>>> offset_start = np.byte_bounds(Z2)[0] - np.byte_bounds(Z1)[0]
>>> print(offset_start) # bytes
8

>>> offset_stop = np.byte_bounds(Z2)[-1] - np.byte_bounds(Z1)[-1]
>>> print(offset_stop) # bytes
-16

使用itemsize并考虑到offset_stop为负(Z2的端点在逻辑上小于Z1数组的端点),将这些偏移量转换为索引非常简单。因此,我们需要添加项目大小Z1来获得正确的结束索引。

1
2
3
4
>>> start = offset_start // Z1.itemsize
>>> stop = Z1.size + offset_stop // Z1.itemsize
>>> print(start, stop, step)
1, 8, 2

最后,我们测试我们的结果:

1
2
>>> print(np.allclose(Z1[start:stop:step], Z2))
True

作为练习,您可以通过考虑以下因素来改进这个第一个非常简单的实现:

  • 负的step
  • 多维数组

练习的答案

代码矢量化

介绍

代码矢量化意味着您尝试解决的问题本质上是可矢量化的,只需要一些简单的技巧就可以使其更快。当然,这并不意味着它很容易或很直观,但至少它不需要完全重新思考你的问题(正如 问题矢量化章节中的情况)。尽管如此,它可能需要一些经验来了解代码在哪里可以矢量化。让我们通过一个简单的例子来说明这一点,在这个例子中,我们想要对两个整数列表进行逐项求和。使用纯Python的一个简单方法是:

1
2
def add_python(Z1,Z2):
return [z1+z2 for (z1,z2) in zip(Z1,Z2)]

使用numpy可以非常容易地将上面的问题矢量化:

1
2
def add_numpy(Z1,Z2):
return np.add(Z1,Z2)

毫无疑问,对这两种方法进行基准测试表明,第二种方法速度最快,只有一个数量级。

1
2
3
4
5
6
>>> Z1 = random.sample(range(1000), 100)
>>> Z2 = random.sample(range(1000), 100)
>>> timeit("add_python(Z1, Z2)", globals())
1000 loops, best of 3: 68 usec per loop
>>> timeit("add_numpy(Z1, Z2)", globals())
10000 loops, best of 3: 1.14 usec per loop

第二种方法不仅速度更快,而且自然适应Z1和Z2的形状。这就是为什么我们没有写Z1 + Z2的原因,因为如果Z1和Z2都是列表的话,它就不起作用了。在第一种Python方法中,内部+根据两个对象的性质有不同的解释,因此如果我们考虑两个嵌套列表,我们会得到以下输出:

1
2
3
4
5
6
7
8
9
>>> Z1 = [[1, 2], [3, 4]]
>>> Z2 = [[5, 6], [7, 8]]
>>> Z1 + Z2
[[1, 2], [3, 4], [5, 6], [7, 8]]
>>> add_python(Z1, Z2)
[[1, 2, 5, 6], [3, 4, 7, 8]]
>>> add_numpy(Z1, Z2)
[[ 6 8]
[10 12]]

第一种方法将两个列表连接在一起,第二种方法将内部列表连接在一起,最后一种方法计算预期的(数字)值。作为练习,您可以重写Python版本,以便它接受任何深度的嵌套列表。

统一矢量化

统一矢量化是矢量化的最简单形式,其中所有元素在每个time step共享相同的计算,而不需要对任何元素进行特定的处理。一个典型的例子是John Conway发明的生命的游戏(见下文),它是细胞自动机最早的例子之一。这些细胞自动机可以方便地被看作是一组细胞,这些细胞与相邻细胞的概念连接在一起,它们的矢量化非常简单。让我先定义一下这个游戏,然后我们看看如何对它矢量化。

下图说明:圆锥形纺织蜗牛的壳上呈现出细胞自动机模式。图片来自Richard Ling,2005。

生命的游戏

注释:这是维基百科关于生命的游戏条目的摘录

生命游戏是英国数学家约翰·何顿·康威在1970年设计的细胞自动机。这是细胞自动机最著名的例子。“游戏”实际上是一个零玩家游戏,这意味着它的进化是由它的初始状态决定的,不需要人类玩家的输入。一个人通过创建一个初始配置并观察它是如何进化来与生命游戏互动的。

生命游戏(Game of Life)的宇宙是一个由正方形细胞组成的无限二维正交网格,每个细胞都处于两种可能状态之一,活着或死去。每个单元与其八个相邻单元相互作用,这八个相邻单元是直接水平、垂直或对角相邻的单元。在时间的每一步,都会发生以下转变:

  • 任何少于两个活邻居的活细胞都会死亡,就好像是由于人口不足引起的衰亡。
  • 任何有三个以上活邻居的活细胞都会死亡,就像过度拥挤一样。
  • 任何有两三个活邻居的活细胞,都会原封不动地延续到下一代。
  • 任何只有三个活邻居的死细胞都变成活细胞。

最初的模式构成了系统的“种子”。第一代是通过将上述规则同时应用于种子中的每一个细胞而产生的——出生和死亡同时发生,这种不连续的时刻有时被称为tick。(换句话说,每一代都是前一代的pure function。)这些规则继续被反复应用,以创造下一代。

Python实现

注释:我们可以使用更高效的python array接口,但是使用熟悉的list对象更方便。

在纯Python中,我们可以使用一系列代表细胞应该进化的棋盘的列表来编写生命游戏。这种板将配备0的边界,这样可以通过在计算邻居数量时避免对边界进行特定测试来稍微加快速度。

1
2
3
4
5
6
Z = [[0,0,0,0,0,0],
[0,0,0,1,0,0],
[0,1,0,1,0,0],
[0,0,1,1,0,0],
[0,0,0,0,0,0],
[0,0,0,0,0,0]]

考虑到边界因素,计算邻居是很简单的:

1
2
3
4
5
6
7
8
9
def compute_neighbours(Z):
shape = len(Z), len(Z[0])
N = [[0,]*(shape[0]) for i in range(shape[1])]
for x in range(1,shape[0]-1):
for y in range(1,shape[1]-1):
N[x][y] = Z[x-1][y-1]+Z[x][y-1]+Z[x+1][y-1] \
+ Z[x-1][y] +Z[x+1][y] \
+ Z[x-1][y+1]+Z[x][y+1]+Z[x+1][y+1]
return N

为了及时迭代一步,我们只需计算每个内部单元的邻居数量,然后根据上述四条规则更新整个面板:

1
2
3
4
5
6
7
8
9
def iterate(Z):
N = compute_neighbours(Z)
for x in range(1,shape[0]-1):
for y in range(1,shape[1]-1):
if Z[x][y] == 1 and (N[x][y] < 2 or N[x][y] > 3):
Z[x][y] = 0
elif Z[x][y] == 0 and N[x][y] == 3:
Z[x][y] = 1
return Z

下图显示了4x4区域上的四次迭代,初始状态是glider,这是Richard K. Guy在1970年发现的结构。

下图注释:众所周知,glider模式在4次迭代中沿着对角线复制自己移动一步。

Numpy实现

从纯Python实现的版本来看,生命游戏的矢量化需要两部分,一部分负责计算邻居,另一部分负责执行规则。如果我们记得我们在竞技场周围增加了一个零边界,邻居计数就相对容易了。通过考虑竞技场的局部视图,我们实际上可以非常直观地访问邻居,如下图中一维情况所示:

1
2
3
4
5
6
7
8
9
10
11
               ┏━━━┳━━━┳━━━┓───┬───┐
Z[:-2] ┃ 0 ┃ 1 ┃ 1 ┃ 1 │ 0 │ (left neighbours)
┗━━━┻━━━┻━━━┛───┴───┘
↓︎
┌───┏━━━┳━━━┳━━━┓───┐
Z[1:-1] │ 0 ┃ 1 ┃ 1 ┃ 1 ┃ 0 │ (actual cells)
└───┗━━━┻━━━┻━━━┛───┘

┌───┬───┏━━━┳━━━┳━━━┓
Z[+2:] │ 0 │ 1 ┃ 1 ┃ 1 ┃ 0 ┃ (right neighbours)
└───┴───┗━━━┻━━━┻━━━┛

在二维视图下只需要增加一点运算来确保考虑所有的八个邻居。

1
2
3
4
N = np.zeros(Z.shape, dtype=int)
N[1:-1,1:-1] += (Z[ :-2, :-2] + Z[ :-2,1:-1] + Z[ :-2,2:] +
Z[1:-1, :-2] + Z[1:-1,2:] +
Z[2: , :-2] + Z[2: ,1:-1] + Z[2: ,2:])

对于规则执行,我们可以使用numpy的argwhere方法编写第一个版本,该方法将给出给定条件为真时的索引。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# Flatten arrays
N_ = N.ravel()
Z_ = Z.ravel()

# Apply rules
R1 = np.argwhere( (Z_==1) & (N_ < 2) )
R2 = np.argwhere( (Z_==1) & (N_ > 3) )
R3 = np.argwhere( (Z_==1) & ((N_==2) | (N_==3)) )
R4 = np.argwhere( (Z_==0) & (N_==3) )

# Set new values
Z_[R1] = 0
Z_[R2] = 0
Z_[R3] = Z_[R3]
Z_[R4] = 1

# Make sure borders stay null
Z[0,:] = Z[-1,:] = Z[:,0] = Z[:,-1] = 0

即使第一个版本不使用嵌套循环,它也远远不是最佳的,因为使用了四个argwhere调用,这可能非常慢。相反,我们可以将这些规则分解成存活的细胞(状态为1)和繁衍的细胞。为此,我们可以利用Numpy布尔功能,非常自然地编写:

注释:我们没有写Z = 0,因为这将简单地把值0赋给Z,然后它将变成一个简单的标量。

1
2
3
4
birth = (N==3)[1:-1,1:-1] & (Z[1:-1,1:-1]==0)
survive = ((N==2) | (N==3))[1:-1,1:-1] & (Z[1:-1,1:-1]==1)
Z[...] = 0
Z[1:-1,1:-1][birth | survive] = 1

如果您查看birth行和survive行,您将会看到这两个变量是数组,它们可以在清除数据后用于将Z值设置为1。

说明:灰度的程度表示一个单元格在过去有多活跃。

练习

化学物质的反应和扩散会产生各种各样的模式,让人想起自然界中常见的模式。Gray-Scott方程模拟了这种反应。有关这个化学系统的更多信息,请参阅《简单系统中的复杂模式》(John E. Pearson,《科学》,第261卷,1993年)。让我们考虑两种化学物质U和V,其各自浓度为u和v,扩散速率为Du和Dv。V以k的转换速率转换为P. f代表加入U和消耗U、V和P的过程速率。这可以写成:

Chemical reaction Equations
U + 2V → 3V $u̇ = Du∇^2u − uv^2 + f(1 − u)$
V → P $v̇ = Dv∇^2v + uv^2 − (f + k)v$

以生命游戏为例,尝试实现这样的反应扩散系统。这里有一组有趣的参数要测试:

Name Du Dv f k
Bacteria 1 0.16 0.08 0.035
Bacteria 2 0.14 0.06 0.035
Coral 0.16 0.08 0.060 0.062
Fingerprint 0.19 0.05 0.060 0.062
Spirals 0.10 0.10 0.018 0.050
Spirals Dense 0.12 0.08 0.020
Spirals Fast 0.10 0.16 0.020
Unstable 0.16 0.08 0.020 0.055
Worms 1 0.16 0.08 0.050 0.065
Worms 2 0.16 0.08 0.054 0.063
Zebrafish 0.16 0.08 0.035 0.060

下图显示了特定参数集模型的一些动画。

说明:反应扩散灰色斯科特模型。从左到右,细菌1,珊瑚和螺旋密集。

源码

参考

时间矢量化

Mandelbrot集合是复数c的集合,对于该集合,当从$z = 0$开始迭代,函数$f_c(z) = z^2 + c$不发散,即序列$f_c(0)$, $f_c(f_c(0)$,等等,保持绝对值有界。这很容易计算,但可能需要很长时间,因为您需要确保给定的数字不会发散。这通常是通过迭代计算到最大迭代次数来完成的,在此之后,如果该次数仍然在某个界限内,则认为它是不发散的。当然,迭代次数越多,精度越高。

下图说明:罗马西兰花,表现出近似于自然分形的自相似形式。Jon Sullivan的照片,2004年。

Python实现

一个纯python实现:

1
2
3
4
5
6
7
8
9
10
11
def mandelbrot_python(xmin, xmax, ymin, ymax, xn, yn, maxiter, horizon=2.0):
def mandelbrot(z, maxiter):
c = z
for n in range(maxiter):
if abs(z) > horizon:
return n
z = z*z + c
return maxiter
r1 = [xmin+i*(xmax-xmin)/xn for i in range(xn)]
r2 = [ymin+i*(ymax-ymin)/yn for i in range(yn)]
return [mandelbrot(complex(r, i),maxiter) for r in r1 for i in r2]

这段代码有趣的(慢的)部分是mandelbrot函数,它实际上计算序列$f_c(f_c(f_c…))$。这种代码的矢量化并不简单,因为内部返回意味着要对每个元素进行不同的处理。一旦它发散了,我们就不需要再迭代了,我们可以在发散时安全地返回迭代计数。问题是在numpy也要这么做。但是怎么做呢?

Numpy实现

诀窍是在每次迭代中搜索尚未发散的值,并更新这些值的相关信息,并且只更新这些值。因为我们从Z = 0开始,我们知道每个值将至少更新一次(当它们等于0时,它们还没有发散),并且一旦它们发散就会停止更新。为此,我们将使用带有less(x1,x2)函数的numpy fancy索引,该函数按元素返回(x1 < x2)的真值。

1
2
3
4
5
6
7
8
9
10
11
12
def mandelbrot_numpy(xmin, xmax, ymin, ymax, xn, yn, maxiter, horizon=2.0):
X = np.linspace(xmin, xmax, xn, dtype=np.float32)
Y = np.linspace(ymin, ymax, yn, dtype=np.float32)
C = X + Y[:,None]*1j
N = np.zeros(C.shape, dtype=int)
Z = np.zeros(C.shape, np.complex64)
for n in range(maxiter):
I = np.less(abs(Z), horizon)
N[I] = n
Z[I] = Z[I]**2 + C[I]
N[N == maxiter-1] = 0
return Z, N

以下是基准:

1
2
3
4
5
6
7
>>> xmin, xmax, xn = -2.25, +0.75, int(3000/3)
>>> ymin, ymax, yn = -1.25, +1.25, int(2500/3)
>>> maxiter = 200
>>> timeit("mandelbrot_python(xmin, xmax, ymin, ymax, xn, yn, maxiter)", globals())
1 loops, best of 3: 6.1 sec per loop
>>> timeit("mandelbrot_numpy(xmin, xmax, ymin, ymax, xn, yn, maxiter)", globals())
1 loops, best of 3: 1.15 sec per loop

更快的numpy实现

收益大约是我们预期的5倍。部分问题是np.less函数意味着每次迭代都要进行xn × yn测试,而我们知道有些值已经偏离了。即使这些测试是在C级(通过numpy)进行的,成本仍然很高。Dan Goodman提出的另一种在每次迭代中处理动态数组的方法,只存储尚未发散的点。它需要更多的行,但结果更快,与Python版本相比,速度提高了10倍。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def mandelbrot_numpy_2(xmin, xmax, ymin, ymax, xn, yn, itermax, horizon=2.0):
Xi, Yi = np.mgrid[0:xn, 0:yn]
Xi, Yi = Xi.astype(np.uint32), Yi.astype(np.uint32)
X = np.linspace(xmin, xmax, xn, dtype=np.float32)[Xi]
Y = np.linspace(ymin, ymax, yn, dtype=np.float32)[Yi]
C = X + Y*1j
N_ = np.zeros(C.shape, dtype=np.uint32)
Z_ = np.zeros(C.shape, dtype=np.complex64)
Xi.shape = Yi.shape = C.shape = xn*yn

Z = np.zeros(C.shape, np.complex64)
for i in range(itermax):
if not len(Z): break

# Compute for relevant points only
np.multiply(Z, Z, Z)
np.add(Z, C, Z)

# Failed convergence
I = abs(Z) > horizon
N_[Xi[I], Yi[I]] = i+1
Z_[Xi[I], Yi[I]] = Z[I]

# Keep going with those who have not diverged yet
np.negative(I,I)
Z = Z[I]
Xi, Yi = Xi[I], Yi[I]
C = C[I]
return Z_.T, N_.T

基准测试结果:

1
2
>>> timeit("mandelbrot_numpy_2(xmin, xmax, ymin, ymax, xn, yn, maxiter)", globals())
1 loops, best of 3: 510 msec per loop

可视化

为了可视化我们的结果,我们可以使用matplotlib imshow命令直接显示N array,但是这将产生一个“带状”图像,这是我们一直使用的转义计数算法的已知结果。这种条带可以通过使用分数转义计数来消除。这可以通过测量迭代点落在逃逸截止点之外的距离来实现。参见下面关于逃逸计数重正化的参考资料。这是一张我们使用重新计数归一化的结果图片,并添加了一个幂归一化的颜色映射(gamma=0.3)以及明暗处理。

下图说明:matplotlib使用重新计数归一化、幂归一化颜色映射(gamma=0.3)和明暗处理渲染的Mandelbrot。

练习

说明:您应该查看ufunc.reduceat方法,该方法在单个轴上使用指定的切片执行(局部)缩减。

我们现在想用Minkowski–Bouligand dimension来测量Mandelbrot集的分形维数。要做到这一点,我们需要用减小的盒子尺寸进行盒子计数(见下图)。可以想象,我们不能使用纯Python,因为它太慢了。本练习的目标是使用numpy编写一个函数,该函数采用二维浮点数组并返回维度。我们将考虑对数组中的值进行规范化(即所有值都在0和1之间)。

说明:英国海岸线的Minkowski–Bouligand dimension约为1.24。

源码

参考

4.4空间矢量化

空间矢量化指的是元素共享相同计算但仅与其他元素的子组交互的情况。 对于生命游戏示例来说已经是这种情况,但在某些情况下会有一个额外的难度,因为子组是动态的,需要在每次迭代时更新。 例如,在粒子系统中,粒子主要与局部邻居相互作用。 这也是模拟植绒行为的“boids”的情况。

下图说明:植绒鸟是生物学中自组织的一个例子。Christoffer A Rasmussen的照片,2012年。

Boids

说明:摘自维基百科词条Boids

Boids是克雷格·雷诺兹(Craig Reynolds)在1986年开发的一个人工生命程序,它模拟鸟类的群集行为。“boid”这个名字对应于“鸟样物体”的简称,它指的是鸟样物体。

和大多数人工生命模拟一样,Boids是紧急行为的一个例子;也就是说,Boid的复杂性来自于遵循一组简单规则的个体代理(在这种情况下是Boid)的相互作用。在最简单的Boids世界中应用的规则如下:

  • 分离:转向以避免挤到附近的鸟群
  • 对准:转向当地鸟群的平均航向
  • 凝聚力:转向同类的平均位置(质心)移动

下图说明:Boids由一套三个局部规则(分离、凝聚和对齐)控制,这些规则作为计算速度和加速度的工具。

Python实现

因为每个boid都是一个自治实体,有几个属性,比如位置和速度,所以从编写Boid类开始似乎很自然:

1
2
3
4
5
6
7
8
9
10
import math
import random
from vec2 import vec2

class Boid:
def __init__(self, x=0, y=0):
self.position = vec2(x, y)
angle = random.uniform(0, 2*math.pi)
self.velocity = vec2(math.cos(angle), math.sin(angle))
self.acceleration = vec2(0, 0)

vec2对象是一个非常简单的类,用两个组件处理所有常见的矢量操作。这将为我们在Boid 类中节省一些代码。请注意,在Python包中有一些向量包,但是对于这样一个简单的例子来说,这是多余的。

Boid对于普通的Python来说是一个困难的例子,因为boid与本地邻居有交互。然而,由于boid正在移动,要找到这样的本地邻居,需要计算每个时间步长到每个boid的距离,以便对那些在给定交互半径内的boid进行排序。因此,编写这三条规则的典型方式如下:

1
2
3
4
5
6
7
8
9
10
11
12
def separation(self, boids):
count = 0
for other in boids:
d = (self.position - other.position).length()
if 0 < d < desired_separation:
count += 1
...
if count > 0:
...

def alignment(self, boids): ...
def cohesion(self, boids): ...

下面的参考文献部分给出了完整的源代码,就不在这里赘述了,代码也并不难。

为了完成图片,我们还可以创建一个Flock对象:

1
2
3
4
5
6
7
8
9
10
class Flock:
def __init__(self, count=150):
self.boids = []
for i in range(count):
boid = Boid()
self.boids.append(boid)

def run(self):
for boid in self.boids:
boid.run(self.boids)

使用这种方法,我们最多可以有50个boids,直到计算时间变得太慢,无法制作平滑的动画。正如您可能已经猜到的,我们可以使用numpy做得更好,但是让我先指出这个Python实现的主要问题。如果你看看代码,你肯定会注意到这个代码有很多冗余部分。更准确地说,我们没有利用欧几里得距离是自反的这一事实,即$|x − y| = |y − x|$。在纯Python的实现中,每个规则(function)计算$n^2$次距离,其实如果缓存处理的恰当的话,$\frac{n^2}{2}$次计算就可以满足要求了。此外,每个规则重新计算每个距离,而不缓存其他函数的结果。到最后,我们计算了$3n^2$次距离而不是$\frac{n^2}{2}$次。

Numpy实现

正如您所料,numpy实现采用了不同的方法,我们将把所有boids收集到一个位置数组和一个速度数组中:

1
2
3
n = 500
velocity = np.zeros((n, 2), dtype=np.float32)
position = np.zeros((n, 2), dtype=np.float32)

第一步是计算所有boids的局部邻域,为此,我们需要计算所有配对距离:

1
2
3
dx = np.subtract.outer(position[:, 0], position[:, 0])
dy = np.subtract.outer(position[:, 1], position[:, 1])
distance = np.hypot(dx, dy)

问题矢量化

参考文献