# -*- coding: utf-8 -*-
from . import data_process as dp
from . import train, evaluate, fcnet, hpmodel, nodeframe, save
import torch
import numpy as np
import matplotlib.pyplot as plt
[docs]class ANN(train.Train):
"""Reconstruct functions with Artificial Neural Network.
Parameters
----------
data : array-like
An array with shape of (N, 3), where N is the number of data points, each column represents :math:`X, Y, \sigma_Y`.
hidden_layer : int, optional
The number of hidden layers. Default: 1
mid_node : int, optional
The number of nodes (or neurons) of the middle layer. Default: 4096
hp_model : int, optional
The hyperparameter models, 'rec_1' (no batch normalization) or 'rec_2' (with batch normalization). Default: 'rec_1'
loss_func : str, optional
The loss function, there are three loss functions in this code, L1Loss ('L1'),
MSELoss ('MSE'), and SmoothL1Loss ('SmoothL1'). Default: 'L1'
Attributes
----------
lr : float, optional
The learning rate. Default: 1e-1
lr_min : float, optional
The minimum of the learning rate. Default: 1e-8
iteration : int, optional
The number of iterations. Default: 30000
batch_size_max : int, optional
The maximum of the batch size. Defalt: 300
scale_inputs : bool, optional
If True, the inputs data will be normalized, otherwise, do nothing. Defalt: True
scale_target : bool, optional
If True, the outputs (or target) data will be normalized, otherwise, do nothing. Defalt: True
scale_type : str, optional
The normalization method, 'minmax', 'mean', or 'z_score'. Default: 'z_score'
fix_initialize : bool, optional
If True, the network will be initialized from a specific seed. Default: True
print_info : bool, optional
If True, some information about the training process will be printed. Default: True
Note
----
Hyperparameters of the ANN, such as the number of hidden layers (``hidden_layer``),
the number of neurons (``mid_node``), hyperparameter model (``hp_model``), should be optimized
before reconstructing functions from data. See https://doi.org/10.3847/1538-4365/ab620b for details.
"""
def __init__(self,data,hidden_layer=1,mid_node=4096,hp_model='rec_1',loss_func='L1'):
self.data = data
self.inputs = np.reshape(data[:,0],(-1,1))
self.target = data[:,1:]
self.mid_node = mid_node
self.hidden_layer = hidden_layer
self.hp_model = hp_model
self.lr = 1e-1
self.lr_min = 1e-8
self.iteration = 30000
self.batch_size_max = 300
self.batch_size = self._batch_size()
self.loss_func = train.loss_funcs(name=loss_func)
self.scale_inputs = True
self.scale_target = True
self.scale_type = 'z_score'
self.fix_initialize = True
self.print_info = True
def _nodes(self):
return nodeframe.triangleNode_1(node_in=len(self.inputs[0]),node_mid=self.mid_node,node_out=len(self.target[0]),hidden_layer=self.hidden_layer)
def _hparams(self):
return hpmodel.models(self.hp_model)
def _net(self):
if self.fix_initialize:
torch.manual_seed(1000) # Fixed parameter initialization
self.nodes = self._nodes()
self.hparams = self._hparams()
self.net = fcnet.get_FcNet(nodes=self.nodes, hparams=self.hparams)
if self.print_info:
print(self.net)
def _batch_size(self):
num_train = len(self.data)
if num_train//2 < self.batch_size_max:
bs = num_train//2
else:
bs = self.batch_size_max
return bs
[docs] def statistic(self):
self.inputs_statistic = dp.Statistic(self.inputs).statistic()
self.target_statistic = dp.Statistic(self.target).statistic()
[docs] def train(self):
self._net()
self.transfer_net(prints=self.print_info)
self.optimizer = self._optimizer(name='Adam')
self.statistic()
self.transfer_data()
if self.scale_inputs:
self.inputs = dp.Normalize(self.inputs, self.inputs_statistic, norm_type=self.scale_type).norm()
if self.scale_target:
self.target = dp.Normalize(self.target, self.target_statistic, norm_type=self.scale_type).norm()
self.net, self.loss = self.train_1(inputs=self.inputs,target=self.target,repeat_n=1,set_seed=True,lr_decay=True,print_info=self.print_info,showIter_n=2000)
self.net = self.net.cpu()
return self.net, self.loss
[docs] def predict(self, xpoint=None, xspace=None):
"""Prediction
Parameters
----------
xpoint : array-like or None, optional
An array of :math:`X` points. Default: None
xspace : tuple, list, or None, optional
If not None, xpoint will be ignored, and it should be (xmin, xmax, npoint) or [xmin, xmax, npoint]. Default: None
Returns
-------
array-like
The reconstructed function.
"""
if xpoint is None:
if xspace is None:
xpoint = np.linspace(min(self.data[:,0]), max(self.data[:,0]), 100)
else:
xpoint = np.linspace(xspace[0], xspace[1], xspace[2])
x = np.copy(xpoint)
if self.scale_inputs:
xpoint = dp.Normalize(xpoint, self.inputs_statistic, norm_type=self.scale_type).norm()
y = evaluate.predict(self.net, np.reshape(xpoint, (-1,1)), use_GPU=self.use_GPU)##
if self.scale_target:
y = dp.InverseNormalize(y, self.target_statistic, norm_type=self.scale_type).inverseNorm()
self.func = np.c_[x, y]
return self.func
[docs] def save_net(self, path='func', obsName='Hz'):
path = path + '/nn'
save.mkdir(path)
full_path = path + '/ANN-%s_nodes%s.pt'%(obsName, str(self.nodes))
torch.save(self.net, full_path)
def _save_loss(self, path='func', obsName='Hz'):
path = path + '/nn'
fileName = 'ANN_loss-%s_nodes%s'%(obsName, str(self.nodes))
save.savenpy(path, fileName, self.loss)
[docs] def save_func(self, path='func', obsName='Hz', file_type='npy'):
fileName = 'ANN_%s_nodes%s'%(obsName, str(self.nodes))
if file_type=='txt':
save.savetxt(path, fileName, self.func)
elif file_type=='npy':
save.savenpy(path, fileName, self.func)
[docs] def plot_loss(self):
evaluate.plot_loss(self.loss)
[docs] def plot_func(self):
plt.figure(figsize=(8, 6))
plt.errorbar(self.data[:,0], self.data[:,1], yerr=self.data[:,2], fmt='ro', alpha=1, label='Observational data')
plt.plot(self.func[:,0], self.func[:,1], 'k', label=r'$\rm Predicted\ f(x)$', lw=2)
plt.fill_between(self.func[:,0], self.func[:,1]-self.func[:,2], self.func[:,1]+self.func[:,2],
label=r'$\rm\sigma_{f(x)}$', color='g', alpha=0.5)
plt.xlabel('x', fontsize=16)
plt.ylabel('f(x)', fontsize=16)
plt.legend(fontsize=16)
[docs]class OptimizeANN(object):
def __init__(self,truth,hidden_layers=[1,2,3],mid_nodes=[128,256]):
self.truth = truth
self.mid_nodes = mid_nodes
self.hidden_layers = hidden_layers
def _file_name(self, obsName, hidden_layer, mid_node):
nodes = nodeframe.triangleNode_1(node_in=1,node_mid=mid_node,node_out=2,hidden_layer=hidden_layer)
return 'ANN_%s_nodes%s'%(obsName, nodes)
[docs] def risk(self, predict, error):
return round(self.rss(predict) + sum(error**2), 3)
[docs] def get_risk(self,path='',obsName='Hz',file_type='npy'):
self.risks = {}
self.risk_mean = {}
for hidden_layer in self.hidden_layers:
rsk_1 = []
for mid_node in self.mid_nodes:
file_name = self._file_name(obsName,hidden_layer,mid_node)
if file_type=='txt':
f = np.loadtxt('%s/%s.txt'%(path,file_name))
elif file_type=='npy':
f = np.load('%s/%s.npy'%(path,file_name))
rsk_1.append(self.risk(f[:,1], f[:,2]))
self.risks[str(hidden_layer)] = rsk_1
self.risk_mean[str(hidden_layer)] = round(np.mean(rsk_1), 3)
[docs] def get_optimal(self):
self.optimal_layer = min(self.risk_mean, key=self.risk_mean.get)
minRisk_index = self.risks[self.optimal_layer].index(min(self.risks[self.optimal_layer]))
self.optimal_node = self.mid_nodes[minRisk_index]
self.optimal_risk = self.risks[self.optimal_layer][minRisk_index]
[docs] def plot_risk(self):
self.get_optimal()
plt.figure(figsize=(6*1.2*2,4.5*1.2))
plt.subplot(1,2,1)
for hidden_layer in self.hidden_layers:
plt.semilogx(self.mid_nodes, self.risks[str(hidden_layer)], '-o', label=r'$\rm Hidden\ layer: %s$'%hidden_layer, lw=2)
plt.title(r'$\rm Optimal\ layer: %s$'%self.optimal_layer, fontsize=16)
plt.legend(fontsize=16)
plt.xlabel('Number of neurons', fontsize=16)
plt.ylabel('Risk', fontsize=16)
plt.subplot(1,2,2)
plt.semilogx(self.mid_nodes, self.risks[self.optimal_layer], '-o', label=r'$\rm Hidden\ layer: %s$'%self.optimal_layer, lw=2)
plt.title(r'$\rm Optimal\ node: %s$'%self.optimal_node, fontsize=16)
plt.legend(fontsize=16)
plt.xlabel('Number of neurons', fontsize=16)
plt.ylabel('Risk', fontsize=16)
[docs]class RePredictANN(ANN):
"""Reconstruct function using the saved well-trained networks
Parameters
----------
data : array-like
An array with shape of (N, 3), where N is the number of data points, each column represents :math:`X, Y, \sigma_Y`.
hidden_layer : int, optional
The number of hidden layers. Default: 1
mid_node : int, optional
The number of nodes (or neurons) of the middle layer. Default: 4096
Attributes
----------
scale_inputs : bool, optional
If True, the inputs data will be normalized, otherwise, do nothing. Defalt: True
scale_target : bool, optional
If True, the outputs (or target) data will be normalized, otherwise, do nothing. Defalt: True
scale_type : str, optional
The normalization method, 'minmax', 'mean', or 'z_score'. Default: 'z_score'
"""
def __init__(self,data,hidden_layer=1,mid_node=4096):
self.data = data
self.inputs = np.reshape(data[:,0],(-1,1))
self.target = data[:,1:]
self.mid_node = mid_node
self.hidden_layer = hidden_layer
self.nodes = self._nodes()
self.scale_inputs = True
self.scale_target = True
self.scale_type = 'z_score'
[docs] def load_net(self, path='func', obsName='Hz'):
full_path = path + '/nn/ANN-%s_nodes%s.pt'%(obsName, str(self.nodes))
self.net = torch.load(full_path)
self.statistic()