2.2.1.4 索引体系:步幅

2.2.1.4.1 主要观点

问题

In [28]:

  1. x = np.array([[1, 2, 3],
  2. [4, 5, 6],
  3. [7, 8, 9]], dtype=np.int8)
  4. str(x.data)

Out[28]:

  1. '\x01\x02\x03\x04\x05\x06\x07\x08\t'

item x[1,2]开始在x.data中的哪个字节?

答案(在Numpy)

  • 步幅:寻找一下个元素跳跃的字节数
  • 每个维度一个步幅

In [29]:

  1. x.strides

Out[29]:

  1. (3, 1)

In [31]:

  1. byte_offset = 3*1 + 1*2 # 查找x[1,2]
  2. x.data[byte_offset]

Out[31]:

  1. '\x06'

In [32]:

  1. x[1, 2]

Out[32]:

  1. 6
  • 简单、灵活
2.2.1.4.1.1 C和Fortran顺序

In [34]:

  1. x = np.array([[1, 2, 3],
  2. [4, 5, 6],
  3. [7, 8, 9]], dtype=np.int16, order='C')
  4. x.strides

Out[34]:

  1. (6, 2)

In [35]:

  1. str(x.data)

Out[35]:

  1. '\x01\x00\x02\x00\x03\x00\x04\x00\x05\x00\x06\x00\x07\x00\x08\x00\t\x00'
  • 需要跳跃6个字节寻找下一行
  • 需要跳跃2个字节寻找下一列

In [36]:

  1. y = np.array(x, order='F')
  2. y.strides

Out[36]:

  1. (2, 6)

In [37]:

  1. str(y.data)

Out[37]:

  1. '\x01\x00\x04\x00\x07\x00\x02\x00\x05\x00\x08\x00\x03\x00\x06\x00\t\x00'
  • 需要跳跃2个字节寻找下一行
  • 需要跳跃6个字节寻找下一列

更高维度也类似:

  1. - C:最后的维度变化最快(=最小的步幅)
  2. - F:最早的维度变化最快

png

注意:现在我们可以理解.view()的行为:

In [38]:

  1. y = np.array([[1, 3], [2, 4]], dtype=np.uint8).transpose()
  2. x = y.copy()

变换顺序不影响数据的内部布局,只是步幅

In [39]:

  1. x.strides

Out[39]:

  1. (2, 1)

In [40]:

  1. y.strides

Out[40]:

  1. (1, 2)

In [41]:

  1. str(x.data)

Out[41]:

  1. '\x01\x02\x03\x04'

In [42]:

  1. str(y.data)

Out[42]:

  1. '\x01\x03\x02\x04'
  • 当解释为int16时结果会不同
  • .copy()以C顺序(默认)创建新的数组
2.2.1.4.1.2 用整数切片
  • 通过仅改变形状、步幅和可能调整数据指针可以代表任何东西!
  • 不用制造数据的副本

In [43]:

  1. x = np.array([1, 2, 3, 4, 5, 6], dtype=np.int32)
  2. y = x[::-1]
  3. y

Out[43]:

  1. array([6, 5, 4, 3, 2, 1], dtype=int32)

In [44]:

  1. y.strides

Out[44]:

  1. (-4,)

In [45]:

  1. y = x[2:]
  2. y.__array_interface__['data'][0] - x.__array_interface__['data'][0]

Out[45]:

  1. 8

In [46]:

  1. x = np.zeros((10, 10, 10), dtype=np.float)
  2. x.strides

Out[46]:

  1. (800, 80, 8)

In [47]:

  1. x[::2,::3,::4].strides

Out[47]:

  1. (1600, 240, 32)
  • 类似的,变换顺序绝不会创建副本(只是交换的步幅)

In [48]:

  1. x = np.zeros((10, 10, 10), dtype=np.float)
  2. x.strides

Out[48]:

  1. (800, 80, 8)

In [49]:

  1. x.T.strides

Out[49]:

  1. (8, 80, 800)

但是:并不是所有的重排操作都可以通过操纵步幅来完成。

In [3]:

  1. a = np.arange(6, dtype=np.int8).reshape(3, 2)
  2. b = a.T
  3. b.strides

Out[3]:

  1. (1, 2)

到目前为止,都很不错,但是:

In [4]:

  1. str(a.data)

Out[4]:

  1. '\x00\x01\x02\x03\x04\x05'

In [5]:

  1. b

Out[5]:

  1. array([[0, 2, 4],
  2. [1, 3, 5]], dtype=int8)

In [6]:

  1. c = b.reshape(3*2)
  2. c

Out[6]:

  1. array([0, 2, 4, 1, 3, 5], dtype=int8)

这里,没办法用一个给定的步长和a的内存块来表示数组c。因此,重排操作在这里需要制作一个副本。

2.2.1.4.2 例子:用步长伪造维度

步长操作

In [2]:

  1. from numpy.lib.stride_tricks import as_strided
  2. help(as_strided)
  1. Help on function as_strided in module numpy.lib.stride_tricks:
  2. as_strided(x, shape=None, strides=None)
  3. Make an ndarray from the given array with the given shape and strides.

警告as_strided并不检查你是否还待在内存块边界里..

In [9]:

  1. x = np.array([1, 2, 3, 4], dtype=np.int16)
  2. as_strided(x, strides=(2*2, ), shape=(2, ))

Out[9]:

  1. array([1, 3], dtype=int16)

In [10]:

  1. x[::2]

Out[10]:

  1. array([1, 3], dtype=int16)

也可以看一下:stride-fakedims.py

练习

In [ ]:

  1. array([1, 2, 3, 4], dtype=np.int8)
  2. -> array([[1, 2, 3, 4],
  3. [1, 2, 3, 4],
  4. [1, 2, 3, 4]], dtype=np.int8)

仅使用as_strided.:

提示:byte_offset = stride[0]_index[0] + stride[1]_index[1] + …

解密:

步长可以设置为0:

In [11]:

  1. x = np.array([1, 2, 3, 4], dtype=np.int8)
  2. y = as_strided(x, strides=(0, 1), shape=(3, 4))
  3. y

Out[11]:

  1. array([[1, 2, 3, 4],
  2. [1, 2, 3, 4],
  3. [1, 2, 3, 4]], dtype=int8)

In [12]:

  1. y.base.base is x

Out[12]:

  1. True

2.2.1.4.3 广播

  • 用它来做一些有用的事情:[1, 2, 3, 4]和[5, 6, 7]的外积

In [13]:

  1. x = np.array([1, 2, 3, 4], dtype=np.int16)
  2. x2 = as_strided(x, strides=(0, 1*2), shape=(3, 4))
  3. x2

Out[13]:

  1. array([[1, 2, 3, 4],
  2. [1, 2, 3, 4],
  3. [1, 2, 3, 4]], dtype=int16)

In [14]:

  1. y = np.array([5, 6, 7], dtype=np.int16)
  2. y2 = as_strided(y, strides=(1*2, 0), shape=(3, 4))
  3. y2

Out[14]:

  1. array([[5, 5, 5, 5],
  2. [6, 6, 6, 6],
  3. [7, 7, 7, 7]], dtype=int16)

In [15]:

  1. x2 * y2

Out[15]:

  1. array([[ 5, 10, 15, 20],
  2. [ 6, 12, 18, 24],
  3. [ 7, 14, 21, 28]], dtype=int16)

…看起来有一些熟悉…

In [16]:

  1. x = np.array([1, 2, 3, 4], dtype=np.int16)
  2. y = np.array([5, 6, 7], dtype=np.int16)
  3. x[np.newaxis,:] * y[:,np.newaxis]

Out[16]:

  1. array([[ 5, 10, 15, 20],
  2. [ 6, 12, 18, 24],
  3. [ 7, 14, 21, 28]], dtype=int16)
  • 在内部,数组广播的确使用0步长来实现的。

2.2.1.4.4 更多技巧:对角线

也可以看一下 stride-diagonals.py

挑战

  • 提取矩阵对角线的起点:(假定是C内存顺序):

In [ ]:

  1. x = np.array([[1, 2, 3],
  2. [4, 5, 6],
  3. [7, 8, 9]], dtype=np.int32)
  4. x_diag = as_strided(x, shape=(3,), strides=(???,))
  • 提取第一个超级-对角线的起点[2,6]。
  • 那么子对角线呢?

(后两个问题的提示:切片首先移动步长起点的点。)

答案

提取对角线:

In [6]:

  1. x_diag = as_strided(x, shape=(3, ), strides=((3+1)*x.itemsize, ))
  2. x_diag

Out[6]:

  1. array([1, 5, 9], dtype=int32)

首先切片,调整数据指针:

In [8]:

  1. as_strided(x[0, 1:], shape=(2, ), strides=((3+1)*x.itemsize, ))

Out[8]:

  1. array([2, 6], dtype=int32)

In [9]:

  1. as_strided(x[1:, 0], shape=(2, ), strides=((3+1)*x.itemsize, ))

Out[9]:

  1. array([4, 8], dtype=int32)

笔记

In [7]:

  1. y = np.diag(x, k=1)
  2. y

Out[7]:

  1. array([2, 6], dtype=int32)

但是

In [8]:

  1. y.flags.owndata

Out[8]:

  1. False

这是一个副本?!

也可以看一下stride-diagonals.py

挑战

计算张量的迹:

In [9]:

  1. x = np.arange(5*5*5*5).reshape(5,5,5,5)
  2. s = 0
  3. for i in xrange(5):
  4. for j in xrange(5):
  5. s += x[j,i,j,i]

通过跨越并且在结果上使用sum()

In [ ]:

  1. y = as_strided(x, shape=(5, 5), strides=(TODO, TODO))
  2. s2 = ...
  3. assert s == s2

答案

In [ ]:

  1. y = as_strided(x, shape=(5, 5), strides=((5*5*5 + 5)*x.itemsize,
  2. (5*5 + 1)*x.itemsize))
  3. s2 = y.sum()

2.2.1.4.5 CPU缓存效果

内存布局可以影响性能:

In [13]:

  1. x = np.zeros((20000,))
  2. y = np.zeros((20000*67,))[::67]
  3. x.shape, y.shape

Out[13]:

  1. ((20000,), (20000,))

In [14]:

  1. %timeit x.sum()
  1. The slowest run took 20.69 times longer than the fastest. This could mean that an intermediate result is being cached
  2. 10000 loops, best of 3: 15.4 µs per loop

In [15]:

  1. %timeit y.sum()
  1. The slowest run took 114.83 times longer than the fastest. This could mean that an intermediate result is being cached
  2. 10000 loops, best of 3: 53 µs per loop

In [16]:

  1. x.strides, y.strides

Out[16]:

  1. ((8,), (536,))

小步长更快?

2.2.1.4 索引体系:步幅 - 图2

  • CPU从主内存中拉取数据到缓存块 pulls data from main memory to its cache in blocks
  • 如果需要数据项连续操作适应于一个内存块(小步长):
    • 需要更少的迁移
    • 更快

也可以看一下numexpr设计用来减轻数组计算时的缓存效果。

2.2.1.4.6 例子:原地操作(买者当心)

有时,

In [ ]:

  1. a -= b

并不等同于

In [ ]:

  1. a -= b.copy()

In [21]:

  1. x = np.array([[1, 2], [3, 4]])
  2. x -= x.transpose()
  3. x

Out[21]:

  1. array([[ 0, -1],
  2. [ 1, 0]])

In [22]:

  1. y = np.array([[1, 2], [3, 4]])
  2. y -= y.T.copy()
  3. y

Out[22]:

  1. array([[ 0, -1],
  2. [ 1, 0]])
  • xx.transpose()共享数据
  • x -= x.transpose()逐个元素修改数据…
  • 因为xx.transpose()步长不同,修改后的数据重新出现在RHS