3-2,中阶API示范

下面的范例使用TensorFlow的中阶API实现线性回归模型和和DNN二分类模型。

TensorFlow的中阶API主要包括各种模型层,损失函数,优化器,数据管道,特征列等等。

  1. import tensorflow as tf
  2. #打印时间分割线
  3. @tf.function
  4. def printbar():
  5. today_ts = tf.timestamp()%(24*60*60)
  6. hour = tf.cast(today_ts//3600+8,tf.int32)%tf.constant(24)
  7. minite = tf.cast((today_ts%3600)//60,tf.int32)
  8. second = tf.cast(tf.floor(today_ts%60),tf.int32)
  9. def timeformat(m):
  10. if tf.strings.length(tf.strings.format("{}",m))==1:
  11. return(tf.strings.format("0{}",m))
  12. else:
  13. return(tf.strings.format("{}",m))
  14. timestring = tf.strings.join([timeformat(hour),timeformat(minite),
  15. timeformat(second)],separator = ":")
  16. tf.print("=========="*8+timestring)

一,线性回归模型

1,准备数据

  1. import numpy as np
  2. import pandas as pd
  3. from matplotlib import pyplot as plt
  4. import tensorflow as tf
  5. from tensorflow.keras import layers,losses,metrics,optimizers
  6. #样本数量
  7. n = 400
  8. # 生成测试用数据集
  9. X = tf.random.uniform([n,2],minval=-10,maxval=10)
  10. w0 = tf.constant([[2.0],[-3.0]])
  11. b0 = tf.constant([[3.0]])
  12. Y = X@w0 + b0 + tf.random.normal([n,1],mean = 0.0,stddev= 2.0) # @表示矩阵乘法,增加正态扰动
  1. # 数据可视化
  2. %matplotlib inline
  3. %config InlineBackend.figure_format = 'svg'
  4. plt.figure(figsize = (12,5))
  5. ax1 = plt.subplot(121)
  6. ax1.scatter(X[:,0],Y[:,0], c = "b")
  7. plt.xlabel("x1")
  8. plt.ylabel("y",rotation = 0)
  9. ax2 = plt.subplot(122)
  10. ax2.scatter(X[:,1],Y[:,0], c = "g")
  11. plt.xlabel("x2")
  12. plt.ylabel("y",rotation = 0)
  13. plt.show()

3-2,中阶API示范 - 图1

  1. #构建输入数据管道
  2. ds = tf.data.Dataset.from_tensor_slices((X,Y)) \
  3. .shuffle(buffer_size = 100).batch(10) \
  4. .prefetch(tf.data.experimental.AUTOTUNE)

2,定义模型

  1. model = layers.Dense(units = 1)
  2. model.build(input_shape = (2,)) #用build方法创建variables
  3. model.loss_func = losses.mean_squared_error
  4. model.optimizer = optimizers.SGD(learning_rate=0.001)

3,训练模型

  1. #使用autograph机制转换成静态图加速
  2. @tf.function
  3. def train_step(model, features, labels):
  4. with tf.GradientTape() as tape:
  5. predictions = model(features)
  6. loss = model.loss_func(tf.reshape(labels,[-1]), tf.reshape(predictions,[-1]))
  7. grads = tape.gradient(loss,model.variables)
  8. model.optimizer.apply_gradients(zip(grads,model.variables))
  9. return loss
  10. # 测试train_step效果
  11. features,labels = next(ds.as_numpy_iterator())
  12. train_step(model,features,labels)
  1. def train_model(model,epochs):
  2. for epoch in tf.range(1,epochs+1):
  3. loss = tf.constant(0.0)
  4. for features, labels in ds:
  5. loss = train_step(model,features,labels)
  6. if epoch%50==0:
  7. printbar()
  8. tf.print("epoch =",epoch,"loss = ",loss)
  9. tf.print("w =",model.variables[0])
  10. tf.print("b =",model.variables[1])
  11. train_model(model,epochs = 200)
  1. ================================================================================17:01:48
  2. epoch = 50 loss = 2.56481647
  3. w = [[1.99355531]
  4. [-2.99061537]]
  5. b = [3.09484935]
  6. ================================================================================17:01:51
  7. epoch = 100 loss = 5.96198225
  8. w = [[1.98028314]
  9. [-2.96975136]]
  10. b = [3.09501529]
  11. ================================================================================17:01:54
  12. epoch = 150 loss = 4.79625702
  13. w = [[2.00056171]
  14. [-2.98774862]]
  15. b = [3.09567738]
  16. ================================================================================17:01:58
  17. epoch = 200 loss = 8.26704407
  18. w = [[2.00282311]
  19. [-2.99300027]]
  20. b = [3.09406662]
  1. # 结果可视化
  2. %matplotlib inline
  3. %config InlineBackend.figure_format = 'svg'
  4. w,b = model.variables
  5. plt.figure(figsize = (12,5))
  6. ax1 = plt.subplot(121)
  7. ax1.scatter(X[:,0],Y[:,0], c = "b",label = "samples")
  8. ax1.plot(X[:,0],w[0]*X[:,0]+b[0],"-r",linewidth = 5.0,label = "model")
  9. ax1.legend()
  10. plt.xlabel("x1")
  11. plt.ylabel("y",rotation = 0)
  12. ax2 = plt.subplot(122)
  13. ax2.scatter(X[:,1],Y[:,0], c = "g",label = "samples")
  14. ax2.plot(X[:,1],w[1]*X[:,1]+b[0],"-r",linewidth = 5.0,label = "model")
  15. ax2.legend()
  16. plt.xlabel("x2")
  17. plt.ylabel("y",rotation = 0)
  18. plt.show()

3-2,中阶API示范 - 图2

二, DNN二分类模型

1,准备数据

  1. import numpy as np
  2. import pandas as pd
  3. from matplotlib import pyplot as plt
  4. import tensorflow as tf
  5. from tensorflow.keras import layers,losses,metrics,optimizers
  6. %matplotlib inline
  7. %config InlineBackend.figure_format = 'svg'
  8. #正负样本数量
  9. n_positive,n_negative = 2000,2000
  10. #生成正样本, 小圆环分布
  11. r_p = 5.0 + tf.random.truncated_normal([n_positive,1],0.0,1.0)
  12. theta_p = tf.random.uniform([n_positive,1],0.0,2*np.pi)
  13. Xp = tf.concat([r_p*tf.cos(theta_p),r_p*tf.sin(theta_p)],axis = 1)
  14. Yp = tf.ones_like(r_p)
  15. #生成负样本, 大圆环分布
  16. r_n = 8.0 + tf.random.truncated_normal([n_negative,1],0.0,1.0)
  17. theta_n = tf.random.uniform([n_negative,1],0.0,2*np.pi)
  18. Xn = tf.concat([r_n*tf.cos(theta_n),r_n*tf.sin(theta_n)],axis = 1)
  19. Yn = tf.zeros_like(r_n)
  20. #汇总样本
  21. X = tf.concat([Xp,Xn],axis = 0)
  22. Y = tf.concat([Yp,Yn],axis = 0)
  23. #可视化
  24. plt.figure(figsize = (6,6))
  25. plt.scatter(Xp[:,0].numpy(),Xp[:,1].numpy(),c = "r")
  26. plt.scatter(Xn[:,0].numpy(),Xn[:,1].numpy(),c = "g")
  27. plt.legend(["positive","negative"]);

3-2,中阶API示范 - 图3

  1. #构建输入数据管道
  2. ds = tf.data.Dataset.from_tensor_slices((X,Y)) \
  3. .shuffle(buffer_size = 4000).batch(100) \
  4. .prefetch(tf.data.experimental.AUTOTUNE)

2, 定义模型

  1. class DNNModel(tf.Module):
  2. def __init__(self,name = None):
  3. super(DNNModel, self).__init__(name=name)
  4. self.dense1 = layers.Dense(4,activation = "relu")
  5. self.dense2 = layers.Dense(8,activation = "relu")
  6. self.dense3 = layers.Dense(1,activation = "sigmoid")
  7. # 正向传播
  8. @tf.function(input_signature=[tf.TensorSpec(shape = [None,2], dtype = tf.float32)])
  9. def __call__(self,x):
  10. x = self.dense1(x)
  11. x = self.dense2(x)
  12. y = self.dense3(x)
  13. return y
  14. model = DNNModel()
  15. model.loss_func = losses.binary_crossentropy
  16. model.metric_func = metrics.binary_accuracy
  17. model.optimizer = optimizers.Adam(learning_rate=0.001)
  1. # 测试模型结构
  2. (features,labels) = next(ds.as_numpy_iterator())
  3. predictions = model(features)
  4. loss = model.loss_func(tf.reshape(labels,[-1]),tf.reshape(predictions,[-1]))
  5. metric = model.metric_func(tf.reshape(labels,[-1]),tf.reshape(predictions,[-1]))
  6. tf.print("init loss:",loss)
  7. tf.print("init metric",metric)
  1. init loss: 1.13653195
  2. init metric 0.5

3,训练模型

  1. #使用autograph机制转换成静态图加速
  2. @tf.function
  3. def train_step(model, features, labels):
  4. with tf.GradientTape() as tape:
  5. predictions = model(features)
  6. loss = model.loss_func(tf.reshape(labels,[-1]), tf.reshape(predictions,[-1]))
  7. grads = tape.gradient(loss,model.trainable_variables)
  8. model.optimizer.apply_gradients(zip(grads,model.trainable_variables))
  9. metric = model.metric_func(tf.reshape(labels,[-1]), tf.reshape(predictions,[-1]))
  10. return loss,metric
  11. # 测试train_step效果
  12. features,labels = next(ds.as_numpy_iterator())
  13. train_step(model,features,labels)
  1. (<tf.Tensor: shape=(), dtype=float32, numpy=1.2033114>,
  2. <tf.Tensor: shape=(), dtype=float32, numpy=0.47>)
  1. def train_model(model,epochs):
  2. for epoch in tf.range(1,epochs+1):
  3. loss, metric = tf.constant(0.0),tf.constant(0.0)
  4. for features, labels in ds:
  5. loss,metric = train_step(model,features,labels)
  6. if epoch%10==0:
  7. printbar()
  8. tf.print("epoch =",epoch,"loss = ",loss, "accuracy = ",metric)
  9. train_model(model,epochs = 60)
  1. ================================================================================17:07:36
  2. epoch = 10 loss = 0.556449413 accuracy = 0.79
  3. ================================================================================17:07:38
  4. epoch = 20 loss = 0.439187407 accuracy = 0.86
  5. ================================================================================17:07:40
  6. epoch = 30 loss = 0.259921253 accuracy = 0.95
  7. ================================================================================17:07:42
  8. epoch = 40 loss = 0.244920313 accuracy = 0.9
  9. ================================================================================17:07:43
  10. epoch = 50 loss = 0.19839409 accuracy = 0.92
  11. ================================================================================17:07:45
  12. epoch = 60 loss = 0.126151696 accuracy = 0.95
  1. # 结果可视化
  2. fig, (ax1,ax2) = plt.subplots(nrows=1,ncols=2,figsize = (12,5))
  3. ax1.scatter(Xp[:,0].numpy(),Xp[:,1].numpy(),c = "r")
  4. ax1.scatter(Xn[:,0].numpy(),Xn[:,1].numpy(),c = "g")
  5. ax1.legend(["positive","negative"]);
  6. ax1.set_title("y_true");
  7. Xp_pred = tf.boolean_mask(X,tf.squeeze(model(X)>=0.5),axis = 0)
  8. Xn_pred = tf.boolean_mask(X,tf.squeeze(model(X)<0.5),axis = 0)
  9. ax2.scatter(Xp_pred[:,0].numpy(),Xp_pred[:,1].numpy(),c = "r")
  10. ax2.scatter(Xn_pred[:,0].numpy(),Xn_pred[:,1].numpy(),c = "g")
  11. ax2.legend(["positive","negative"]);
  12. ax2.set_title("y_pred");

3-2,中阶API示范 - 图4

如果对本书内容理解上有需要进一步和作者交流的地方,欢迎在公众号”Python与算法之美”下留言。作者时间和精力有限,会酌情予以回复。

也可以在公众号后台回复关键字:加群,加入读者交流群和大家讨论。

image.png