Divergence free

Code
import numpy as np
import matplotlib.pyplot as plt
import matplotlib
from scipy import stats
from copy import deepcopy
import pymc as pm
import pytensor
import pytensor.tensor as tt
from pymc.gp.cov import Covariance
from functools import partial
from pytensor.tensor.linalg import cholesky, eigh, solve_triangular
from scipy.stats import multivariate_normal
solve_lower = partial(solve_triangular, lower=True)
solve_upper = partial(solve_triangular, lower=False)
WARNING (pytensor.tensor.blas): Using NumPy C-API based implementation for BLAS functions.
Code
m = 40
n = 40
s = np.linspace(0, 1, m) * 4
t = np.linspace(0, 1, n) * 4
[S, T] = np.meshgrid(s, t)
SS = S.flatten()
TT = T.flatten()
N = SS.shape[0]
Xpred = np.hstack([TT.reshape(N,1), SS.reshape(N,1)])
Code
np.random.seed(seed=10)
num_random_points = 7
#X_init = np.random.rand(num_random_points, 2)
#X_init[:,0] *= 4
#X_init[:,1] *= 4
X_init = np.array([[3, 4], 
                   [2, 1],
                   [0, 3.5], 
                   [2, 2], 
                   [3, 0], 
                   [1, 1],
                   [1.5, 3]]).reshape(num_random_points,2)
vel_x = -np.cos(X_init[:,0]) * np.sin(X_init[:,1]) 
vel_y =  np.sin(X_init[:,0]) * np.cos(X_init[:,1])
sigma_noise = 1e-6
Code
vel_x_truth = -np.cos(Xpred[:,0]) * np.sin(Xpred[:,1]) 
vel_y_truth = np.sin(Xpred[:,0]) * np.cos(Xpred[:,1])
vel_mag_truth = np.sqrt(vel_x_truth**2 + vel_y_truth**2)
Code
plt.scatter(X_init[:,0], X_init[:,1], c='w', s=40, lw=1, edgecolor='k')
plt.quiver(X_init[:,0], X_init[:,1], vel_x, vel_y)
plt.show()

Code
X_init = np.vstack([X_init, X_init])
print(X_init)
[[3.  4. ]
 [2.  1. ]
 [0.  3.5]
 [2.  2. ]
 [3.  0. ]
 [1.  1. ]
 [1.5 3. ]
 [3.  4. ]
 [2.  1. ]
 [0.  3.5]
 [2.  2. ]
 [3.  0. ]
 [1.  1. ]
 [1.5 3. ]]
Code
y_init = np.vstack([vel_x.reshape(num_random_points,1), vel_y.reshape(num_random_points,1)]).flatten()
Code
y_init
array([-0.74922879,  0.35017549,  0.35078323,  0.37840125,  0.        ,
       -0.45464871, -0.00998243, -0.09224219,  0.4912955 , -0.        ,
       -0.37840125,  0.14112001,  0.45464871, -0.98751255])

Code into pymc3 and starting with different priors, explore the MAP output!

Code
class DivFree(Covariance):
    def __init__(self, input_dim, sigma_f, l):
        super().__init__(input_dim)
        self.input_dim = input_dim
        self.sigma_f2 = tt.square(sigma_f)
        self.l2 = tt.square(l)
        
    def full(self, X, Xs=None):
        if Xs is None:
            Xs = X
        m = int(X.shape[0]/2)
        n = int(Xs.shape[0]/2)
        
        X = X[0:m,:]
        Xs = Xs[0:n,:]
        
        m2 = int( m * 2 )
        n2 = int( n * 2 )
        
        del_X = self.get_X_minus_X(X, Xs, 0)
        del_Y = self.get_X_minus_X(X, Xs, 1)
        del_X2 = tt.square(del_X)
        del_Y2 = tt.square(del_Y)
        
        K = pytensor.shared(np.zeros((m2,n2)))
        
        K = tt.set_subtensor(K[0:m, 0:n], \
                                        self.dK_yy(del_X2,del_Y2))
        
        K = tt.set_subtensor(K[m:m2, n:n2], \
                                        self.dK_xx(del_X2,del_Y2))
        
        K = tt.set_subtensor(K[0:m,n:n2], \
                                        self.dK_yx(del_X2,del_Y2, del_X, del_Y))
        
        K = tt.set_subtensor(K[m:m2,0:n], \
                                        self.dK_yx(del_X2,del_Y2, del_X, del_Y))
        
        return K
        
    def get_X_minus_X(self, X, Xs, input_dim):
        k, v = X.shape[0], Xs.shape[0]
        M = X[:,input_dim].reshape(k,1)
        Ms = Xs[:,input_dim].reshape(v,1)
        return tt.reshape(M, (-1,1)) - tt.reshape(Ms, (1,-1))
    
    def dK_xx(self, del_X2, del_Y2):
        # (sigma_f^2*exp(-(x^2 + y^2)/(2*l^2))*(l^2 - x^2))/l^4
        return (self.sigma_f2 * \
                tt.exp(-(del_X2 + del_Y2)/(2*self.l2) ) * \
                (self.l2  - del_X2))/tt.square(self.l2)
        
    def dK_yy(self, del_X2, del_Y2):
        # (sigma_f^2*exp(-(x^2 + y^2)/(2*l^2))*(l^2 - y^2))/l^4
        return (self.sigma_f2 * 
                tt.exp(-(del_X2 + del_Y2)/(2*self.l2) ) * \
                (self.l2 - del_Y2))/tt.square(self.l2)
        
    #def dK_xy(self, del_X2, del_Y2):
    #    # - l^4*sigma_f^2*x*y*exp(-(l^2*(x^2 + y^2))/2)
    #    return tt.square(self.l2) * self.sigma_f2 * tt.sqrt(del_X2) * tt.sqrt(del_Y2) * \
    #            tt.exp(-0.5 * self.l2 * (del_X2 + del_Y2))
    
    def dK_yx(self, del_X2, del_Y2, del_X, del_Y):
        # (sigma_f^2*x*y*exp(-(x^2 + y^2)/(2*l^2)))/l^4
        return  (self.sigma_f2 * del_X * del_Y * \
                tt.exp(-(del_X2 + del_Y2)/(2*self.l2)))/tt.square(self.l2)
Code
class SquaredExp(Covariance):
    def __init__(self, input_dim, sigma_f, l):
        self.input_dim = input_dim
        self.sigma_f2 = tt.square(sigma_f)
        self.l = tt.square(l)
        
    def full(self, X, Xs=None):
        if Xs is None:
            Xs = X
        return self.sigma_f2 * tt.exp(-0.5 * self.square_dist(X,Xs))

    def square_dist(self, X, Xs):
        X = tt.mul(X, self.l)
        X2 = tt.sum(tt.square(X), 1)
        if Xs is None:
            sqd = -2.0 * tt.dot(X, tt.transpose(X)) + (
                tt.reshape(X2, (-1, 1)) + tt.reshape(X2, (1, -1))
            )
        else:
            Xs = tt.mul(Xs, self.l)
            Xs2 = tt.sum(tt.square(Xs), 1)
            sqd = -2.0 * tt.dot(X, tt.transpose(Xs)) + (
                tt.reshape(X2, (-1, 1)) + tt.reshape(Xs2, (1, -1))
            )
        return tt.clip(sqd, 0.0, np.inf)   
Code
from pymc.gp.util import (
    JITTER_DEFAULT,
    conditioned_vars,
    replace_with_values,
    stabilize,
)
Code
with pm.Model() as model:
    
    sigma_f = pm.HalfNormal("sigma_f", sigma=2.0)
    l = pm.HalfNormal("l", sigma=1.0)
    cov = DivFree(2, sigma_f, l)
    gp = pm.gp.Marginal(cov_func=cov)
    y_ = gp.marginal_likelihood("y_", X=X_init, y=y_init - np.mean(y_init), \
                                noise=sigma_noise)
/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/site-packages/pymc/gp/gp.py:56: FutureWarning: The 'noise' parameter has been been changed to 'sigma' in order to standardize the GP API and will be deprecated in future releases.
  warnings.warn(_noise_deprecation_warning, FutureWarning)
Code
y_init
array([-0.74922879,  0.35017549,  0.35078323,  0.37840125,  0.        ,
       -0.45464871, -0.00998243, -0.09224219,  0.4912955 , -0.        ,
       -0.37840125,  0.14112001,  0.45464871, -0.98751255])
Code
with model:
    mp = pm.find_MAP()



Code
mp
{'sigma_f_log__': array(-0.3511984),
 'l_log__': array(0.43428008),
 'sigma_f': array(0.7038441),
 'l': array(1.54385121)}
Code
Xpred_large = np.vstack([Xpred, Xpred])

with model:
    post_mean, post_covar = gp.predict(Xpred_large, point=mp, diag=False)

from pymc import draw_values

post_mean, post_covar = draw_values([mu, cov], point=mp) post_mean += np.mean(y_init)

Code
velocity_x_mean = post_mean[0:N] 
velocity_y_mean = post_mean[N:]
velocity_mag = np.sqrt(velocity_x_mean**2 + velocity_y_mean**2)
Code
velocity_std = np.sqrt(np.diag(post_covar))
velocity_x_std = velocity_std[0:N]
velocity_y_std = velocity_std[N:]
Code
norm = matplotlib.colors.Normalize(vmin=np.min(velocity_mag),\
                                    vmax=np.max(velocity_mag))

fig = plt.figure(figsize=(15,4))
ax1 = plt.subplot(131)
c = ax1.contourf(T, S, velocity_mag.reshape(n, m), 50, cmap=plt.cm.turbo, norm=norm)
plt.quiver(Xpred[:,0], Xpred[:,1], velocity_x_mean, velocity_y_mean, headwidth=5, scale=10)
plt.scatter(X_init[0:num_random_points,0], X_init[0:num_random_points,1], c='w', s=70, lw=1, edgecolor='k')
cbar = plt.colorbar(c, pad=0.05, shrink=0.6)
cbar.ax.tick_params(labelsize=13)
ax1.set_yticklabels([])
ax1.set_xticklabels([])
#ax1.set_xlabel('(b)')
plt.xlabel(r'$x_1$')
plt.ylabel(r'$x_2$')
ax1.set_title('Posterior velocity mag. with vectors', fontsize=13)


norm = matplotlib.colors.Normalize(vmin=np.min(velocity_x_std),\
                                    vmax=np.max(velocity_x_std))

ax2 = plt.subplot(132)
c = ax2.contourf(T, S, velocity_x_std.reshape(n, m), 50, cmap=plt.cm.turbo, norm=norm)
plt.scatter(X_init[0:num_random_points,0], X_init[0:num_random_points,1], c='w', s=70, lw=1, edgecolor='k')
cbar = plt.colorbar(c, pad=0.05, shrink=0.6)
cbar.ax.tick_params(labelsize=13)
ax2.set_yticklabels([])
ax2.set_xticklabels([])
plt.xlabel(r'$x_1$')
plt.ylabel(r'$x_2$')
#ax2.set_xlabel('(c)')
ax2.set_title('Velocity-x std dev.', fontsize=13)

norm = matplotlib.colors.Normalize(vmin=np.min(velocity_y_std),\
                                    vmax=np.max(velocity_y_std))

ax3 = plt.subplot(133)
c = ax3.contourf(T, S, velocity_y_std.reshape(n, m), 50, cmap=plt.cm.turbo, norm=norm)
plt.scatter(X_init[0:num_random_points,0], X_init[0:num_random_points,1], c='w', s=70, lw=1, edgecolor='k')
cbar = plt.colorbar(c, pad=0.05, shrink=0.6)
cbar.ax.tick_params(labelsize=13)
ax3.set_yticklabels([])
ax3.set_xticklabels([])
plt.xlabel(r'$x_1$')
plt.ylabel(r'$x_2$')
#ax3.set_xlabel('(d)')
ax3.set_title('Velocity-y std dev.', fontsize=13)
plt.savefig('velocity_div_free.png', dpi=170, bbox_inches='tight', transparent=True)
plt.show()

Code
norm = matplotlib.colors.Normalize(vmin=np.min(vel_mag_truth),\
                                    vmax=np.max(vel_mag_truth))

fig = plt.figure(figsize=(4.8,4))
ax1 = plt.subplot()
c = ax1.contourf(T, S, vel_mag_truth.reshape(n, m), 50, cmap=plt.cm.turbo, norm=norm)
plt.quiver(Xpred[:,0], Xpred[:,1], vel_x_truth, vel_y_truth, headwidth=5, scale=10)
cbar = plt.colorbar(c, pad=0.05, shrink=0.6)
cbar.ax.tick_params(labelsize=13)
ax1.set_yticklabels([])
ax1.set_xticklabels([])
#ax1.set_xlabel('(a)')
#ax1.set_title('True velocity mag. with vectors', fontsize=13)
plt.xlabel(r'$x_1$')
plt.ylabel(r'$x_2$')
plt.title('Velocity vector & magnitude (contours)')
plt.savefig('velocity_true.png', dpi=170, bbox_inches='tight', transparent=True)

plt.show()

Standard approach – Independent GPs for each velocity component.

Code
with pm.Model() as model2:
    
    sigma_f = pm.HalfNormal("sigma_f", sigma=1)
    l = pm.HalfNormal("l", sigma=1.0)
    cov = SquaredExp(2, sigma_f, l)
    gp = pm.gp.Marginal(cov_func=cov)
    y_ = gp.marginal_likelihood("y_", X=X_init[0:num_random_points,:].reshape(num_random_points, 2), \
                                      y=y_init[0:num_random_points] - np.mean(y_init[0:num_random_points]), \
                                      noise=1e-4)
/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/site-packages/pymc/gp/gp.py:56: FutureWarning: The 'noise' parameter has been been changed to 'sigma' in order to standardize the GP API and will be deprecated in future releases.
  warnings.warn(_noise_deprecation_warning, FutureWarning)
Code
with model2:
    mp2 = pm.find_MAP()



Code
with model2:
    post_mean2, post_covar2 = gp.predict(Xpred, point=mp2, diag=False)
Code
with pm.Model() as model3:
    
    sigma_f = pm.HalfNormal("sigma_f", sigma=1)
    l = pm.HalfNormal("l", sigma=1.0)
    cov = SquaredExp(2, sigma_f, l)
    gp = pm.gp.Marginal(cov_func=cov)
    y_ = gp.marginal_likelihood("y_", X=X_init[0:num_random_points,:].reshape(num_random_points, 2), \
                                      y=y_init[num_random_points:] - np.mean(y_init[num_random_points:]), \
                                      noise=1e-4)
/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/site-packages/pymc/gp/gp.py:56: FutureWarning: The 'noise' parameter has been been changed to 'sigma' in order to standardize the GP API and will be deprecated in future releases.
  warnings.warn(_noise_deprecation_warning, FutureWarning)
Code
with model3:
    mp3 = pm.find_MAP()
    
with model3:
    post_mean3, post_covar3 = gp.predict(Xpred, point=mp3, diag=False)



mu3, cov3 = gp.predictt(Xpred) post_mean3, post_covar3 = draw_values([mu3, cov3], point=mp3) post_mean3 += np.mean(y_init[num_random_points:])

Code
velocity_x_mean_gp = post_mean2
velocity_y_mean_gp = post_mean3
velocity_mag_mean_gp = np.sqrt(velocity_x_mean_gp**2 + velocity_y_mean_gp**2 )
velocity_x_std_gp = np.sqrt(np.diag(post_covar2))
velocity_y_std_gp = np.sqrt(np.diag(post_covar3))
Code
norm = matplotlib.colors.Normalize(vmin=np.min(velocity_mag_mean_gp),\
                                    vmax=np.max(velocity_mag_mean_gp))

fig = plt.figure(figsize=(15,4))
ax1 = plt.subplot(131)
c = ax1.contourf(T, S, velocity_mag_mean_gp.reshape(n, m), 50, cmap=plt.cm.turbo, norm=norm)
plt.quiver(Xpred[:,0], Xpred[:,1], velocity_x_mean_gp, velocity_y_mean_gp, headwidth=5, scale=10)
plt.scatter(X_init[0:num_random_points,0], X_init[0:num_random_points,1], c='w', s=70, lw=1, edgecolor='k')
cbar = plt.colorbar(c, pad=0.05, shrink=0.6)
cbar.ax.tick_params(labelsize=13)
ax1.set_yticklabels([])
ax1.set_xticklabels([])
plt.xlabel(r'$x_1$')
plt.ylabel(r'$x_2$')
#ax1.set_xlabel('(e)')
ax1.set_title('Posterior velocity mag. with vectors', fontsize=13)


norm = matplotlib.colors.Normalize(vmin=np.min(velocity_x_std_gp),\
                                    vmax=np.max(velocity_x_std_gp))

ax2 = plt.subplot(132)
c = ax2.contourf(T, S, velocity_x_std_gp.reshape(n, m), 50, cmap=plt.cm.turbo, norm=norm)
plt.scatter(X_init[0:num_random_points,0], X_init[0:num_random_points,1], c='w', s=70, lw=1, edgecolor='k')
cbar = plt.colorbar(c, pad=0.05, shrink=0.6)
cbar.ax.tick_params(labelsize=13)
ax2.set_yticklabels([])
ax2.set_xticklabels([])
plt.xlabel(r'$x_1$')
plt.ylabel(r'$x_2$')
#ax2.set_xlabel('(f)')
ax2.set_title('Velocity-x std dev.', fontsize=13)

norm = matplotlib.colors.Normalize(vmin=np.min(velocity_y_std_gp),\
                                    vmax=np.max(velocity_y_std_gp))

ax3 = plt.subplot(133)
c = ax3.contourf(T, S, velocity_y_std_gp.reshape(n, m), 50, cmap=plt.cm.turbo, norm=norm)
plt.scatter(X_init[0:num_random_points,0], X_init[0:num_random_points,1], c='w', s=70, lw=1, edgecolor='k')
cbar = plt.colorbar(c, pad=0.05, shrink=0.6)
cbar.ax.tick_params(labelsize=13)
ax3.set_yticklabels([])
plt.xlabel(r'$x_1$')
plt.ylabel(r'$x_2$')
ax3.set_xticklabels([])
#ax3.set_xlabel('(g)')
ax3.set_title('Velocity-y std dev.', fontsize=13)
plt.savefig('velocity_gp.png', dpi=170, bbox_inches='tight', transparent=True)
plt.show()