Deep Learning with Spark (BigDL)

Mo GPU? Mo problem, as long as you have a YARN cluster and good sense of what the entire thing is going on.

spark-submit –master yarn –properties-file bigdl/conf/spark-bigdl.conf –py-files bigdl-0.4.0-SNAPSHOT-python-api.zip,textclassifier.py –archives BigDL_venv.zip –driver-cores 2 –driver-memory 4g –executor-cores 1 –num-executors 128 –total-executor-cores 128 –executor-memory 4g textclassifier.py

Reference(s)

  1. BigDL

Pythoning without sudo

When sys team is a burden,

  1. Get Python distribution of choice from https://www.python.org/downloads/
  2. Get pip from https://bootstrap.pypa.io/get-pip.py
  3. /path/to/your/python_dist/python get-pip.py
  4. /path/to/your/python_dist/python -m pip install your_module

You are welcome!

Markov Chain Monte Carlo (MCMC)

Recently I have been doing some revision on Data Science as I have gotten bored from Big Data. When one relearns, one would understand the subject from a different perspective. Following is my short note.

  1. Result/ Example:  MCMC Word decryption of Metropolis variation, using Unigram of word, Bigrams of word and Trigrams of letter.
  2. Motivation: Given a known equation with a set of parameters, one would and could a). Get the probability of a set of parameters   (and hence can apporoximate each parameter’s distribution) b). Get the best set of parameters.
  3. Problem: Infinite combination of parameters and hence the calculation of denominator is intractable.
  4. Solution: MCMC
  5. Intuition: a) Monte Carlo – Sampling methods which follow a general format, i.e.: i) Identify parameters ii) Sample value from distribution iii) Compute the output iv)  Aggregate result b) Markov Chain – Localized (dependency, new sample depends on previous draw to reduce search space by introducing constraints which varies according to algorithms)
  6. Common variation (algorithm):
    a) Metropolis – for when i) probability distribution of random walk is “symmetric”, i.e.: equal in any direction. ii) Unknown full conditional distribution, meaning, distribution of parameters are not known.
    b) Metropolis-Hastings – same as a) however augments asymmetric probability distribution of  random walk.
    c) Gibbs-Sampling – Used when full conditional distribution is known. Can still get the joint probability, proven using Hammersly-Clifford Theorem.
  7. MCMC parameters to improve convergence: a) Burn in – To remove initial records from sampled set as they were erratic in movements b)  Sampling interval – i.i.d c) Number of samples – to get to the intended distribution d) Perturbation size – Random walks magnitude.

Reference(s):

  1. MCMC

Shipping PySpark venv

Problem: You have some fancy packages that the cluster didn’t have in their default settings. You could ask the sys team to install them however you need to consider if those packages are commonly used throughout the cluster’s users? If not, this could be a burden to the cluster and hence there will be a reduction in cluster’s performance. Cumulatively, this could mountain to a big resource hog. We could avoid that by sending our environment on-demand which would only be kept for that single application. The environment will be discarded once the Spark application get terminated, successfully and unsuccessfully. In the case of jars, all the dependencies have already been packaged so there’ll never be such kind of problem. This only occurs in PySpark.

  1. Create venv that contains the packages that both the driver and workers needs.
  2. Activate the environment and try running your task locally. If successful,
  3. zip -r /path/to/your_venv.zip /path/to/your_venv
  4. spark-submit your_script.py –master yarn –archives /path/to/your_venv.zip

Done.

Recurrent Neural Network (RNN)

RNN is a marriage of Neural Network(NN) and Hidden Markov Model (HMM). I did not learn about RNN but only NN and HMM back in college but it is quite easy to understand with the background knowledge of NN and HMM. I am learning RNN through Natural Language Processing (NLP) problem.

Firstly, I have modularized my Neural Network code from the previous post such that I only have to provide a Layer class with input, output, error and gradient definition

In NLP, it is common to use Softmax standardization and Cross Entropy (CE) loss function. Softmax standardization basically turns values into probabilities and CE measures the difference between 2 vectors, i.e.: In our case labels and probabilities. With values turned into probabilities through Softmax, we could understand better the magnitude of the distance. An optimal model could be identified using Maximum Likelihood. Maximum Likelihood involves multiplications and could run into numerical stability problem when there are a lot of values to multiply with. Hence, Log Likelihood would be the better alternative. Since CE penalizes misclassifications better than Squared Error (SE) due underlying definition, CE is used instead of SE. Referring to the previous post, CE hence will be SE whereas Softmax would be a function between “gate” and output of the unit.

\begin{aligned} E_{total} = -\frac{1}{C}\sum_{c}y_c\log\hat{y}_c \end{aligned}
\begin{aligned} \frac{\partial E_{total}}{\partial w_{h_{11}z_1}} = \frac{\partial E_{total}}{\partial \hat{y}_1}\frac{\partial \hat{y}_1}{\partial \hat{y}_{rel, 1}}\frac{\partial \hat{y}_{rel, 1}}{\partial z_1}\frac{\partial z_1}{\partial w_{h_{11}z_1}} \\ \end{aligned}

where

\begin{aligned} \frac{\partial E_{1}}{\partial \hat{y}_1} = \frac{1}{C} \frac{y_1}{\hat{y}_1} \end{aligned}
\begin{aligned} \hat{y}_{1} = \frac{e^{\hat{y}_{rel, 1}}}{\sum_{c}e^{\hat{y}_{rel, c}}} \end{aligned}
\begin{aligned} & \frac{\partial \hat{y}_1}{\partial \hat{y}_{rel, 1}} & & = \frac{e^{\hat{y}_{rel, 1}}e^{\hat{y}_{rel, 1}} + e^{\hat{y}_{rel, 1}}e^{\hat{y}_{rel, 1}}}{\sum_{c}(e^{\hat{y}_{rel, c}})^2} \\ &&& = \frac{2(e^{\hat{y}_{rel, 1}})^2}{\sum_{c}(e^{\hat{y}_{rel, c}})^2} \\ &&& = 2\frac{e^{\hat{y}_{rel, 1}}}{\sum_{c}e^{\hat{y}_{rel, c}}} \\ &&& = 2\hat{y}_1 \\ \end{aligned}
\begin{aligned} \hat{y}_{rel, 1} = tanhz_1 \end{aligned}
\begin{aligned} & \frac{\partial \hat{y}_{rel, 1}}{\partial z_1} & & = {sech}^2z_1\\ &&& = 1 - {tanh}^2z_1\\ &&& = 1 - {\hat{y}_{rel, 1}}^2\\ \end{aligned}
\begin{aligned} z_1 = w_{h_{1}z_1}y_{h_{1}} \end{aligned}
\begin{aligned} \frac{\partial z_1}{\partial w_{h_{11}z_1}} = y_{h_{11}} \end{aligned}

To be continued….

Neural Network

Today I have started looking into Long Short Term Memory. LSTM is a kind of Recurrent Neural Network (RNN). The first impression that I had was LSTM is a combination of Neural Network (NN) and State Space Models, specifically Hidden Markov Models (HMM). That would form RNN. LSTM is RNN with filter. To understand them fully, I decided to code it, starting off with Neural Network.

Following is my short note. There are no fancy visualizations for now and I try to include as much related terms as possible as they used to confuse me a lot.

Sample Neural Network coded from scratch

from sklearn import datasets
import numpy as np
import math

def sigFn(z):
    return 1.0/(1.0 + math.exp(-z))

def weiSum(x, w):
    return np.matmul(x, w)

def apply_row_fn(X, row_fn, **kwargs):
    return np.array([row_fn(X[i], i, **kwargs) for i in range(X.shape[0])])

def apply_ele_fn(x, i, ele_fn):
    return np.array([ele_fn(j) for j in x])

def addOne_row(x, i):
    return np.append(x, 1)

def remOne_row(x, i):
    return x[:-1]

def get_dEdy_row(x, i, y):
    return np.array([-0.5*((1 if y[i] == j else 0) - x[j]) for j in range(x.shape[0])])

def get_dydz_ele(y_hat):
    return -(1 - y_hat) * y_hat

def get_SE_row(x, i, y):
    return np.array([math.pow(((1 if y[i] == j else 0) - x[j]), 2) for j in range(n_class - 1)])

def get_pred_row(x, i, y):
    return np.array([(1 if np.argmax(np.append(x, 1 - np.sum(x))) == y[i] else 0)])

iris = datasets.load_iris()
y = iris.target
X = apply_row_fn(iris.data, addOne_row)

n_iter = 50
n_h1_units = 10
n_class = 3
W_h1_out = 2 * (np.random.rand(n_h1_units + 1, n_class - 1) - 0.5)
W_in_h1 = 2 * (np.random.rand(iris.data.shape[1] + 1, n_h1_units) - 0.5)
eta = 0.02

for i in range(n_iter):
    y_h1 = apply_row_fn(apply_row_fn(weiSum(X, W_in_h1), apply_ele_fn, ele_fn=sigFn), addOne_row)
    y_hat = apply_row_fn(weiSum(y_h1, W_h1_out), apply_ele_fn, ele_fn=sigFn)

    SE = apply_row_fn(y_hat, get_SE_row, y=y)
    ACC = apply_row_fn(y_hat, get_pred_row, y=y)
    print("Iter %s: SE = %s | ACC = %s" % (str(i), str(np.sum(SE)), str(100 * float(np.sum(ACC)) / X.shape[0])))

    dEdy = apply_row_fn(y_hat, get_dEdy_row, y=y)
    dydz = apply_row_fn(y_hat, apply_ele_fn, ele_fn=get_dydz_ele)
    dzdy_h1 = W_h1_out
    dy_h1dz_h1 = apply_row_fn(y_h1, apply_ele_fn, ele_fn=get_dydz_ele)
    dz_h1dw = X
    dz_dw_h1 = y_h1

    dEdz = dEdy * dydz

    W_in_h1 = W_in_h1 + eta * apply_row_fn(np.matmul(np.transpose(dz_h1dw), np.matmul(dEdz, np.transpose(dzdy_h1)) * dy_h1dz_h1), remOne_row)
    W_h1_out = W_h1_out + eta * np.matmul(np.transpose(dz_dw_h1), dEdz)

# Iter 156: SE = 64.5131992554 | ACC = 79.3333333333
# Iter 157: SE = 64.5568255983 | ACC = 80.6666666667
# Iter 158: SE = 64.5982885577 | ACC = 82.0
# Iter 159: SE = 64.6376604297 | ACC = 82.0
# Iter 160: SE = 64.6750148136 | ACC = 82.6666666667
# Iter 161: SE = 64.7104259167 | ACC = 81.3333333333

Background (Skip this if one has some background on Neural Networks)

Let’s begin with the basics, Neural Network. Neural Network without hidden layer can be thought of Linear Regression, weighted sum. Adding the Sigmoid function it then becomes Logistic Regression.

Weighted sum,

\begin{aligned} & z_i = \sum_{j} W_{j}X_{ij} \\ \end{aligned}

where i is the index of data point, j is the index of feature and z_i is the predicted output.

With Sigmoid function,

\begin{aligned} & y_i & & = \frac{1}{1 + e^{-z_i}} \\ &&& = \frac{1}{1 + e^{-\sum_{j} w_{j}x_{ij}}}\\ \end{aligned}

Now y_i has become the predicted output instead of z_i. “How do we find the solution, W?” you asked. We can use Least Squares or Gradient Descent. As mentioned, Gradient Descent

Generalizing this,

\begin{aligned} & y_i & & = f_1(z_i) \\ &&& = f_1(f_2(x_i))\\ \end{aligned}

where f_2 is the weighted sum of features and f_1 is 1 for weighted sum and Sigmoid function in the case of “Logistic Regression”. f_2 is called as a logic gate or activation function, z_i is “input” for Neuron i and y_i is the “output”. This “Regression” is also called as Perceptron or Neuron. For simplicity, using it as a classifier, a threshold could be set such that the output above this threshold belongs to 1 of those 2 possible classes. 1 Neuron can classify 2 classes whereas 2 Neurons can classify 3 classes. This is called as 1 vs All logical structure.

Hidden layer(s) is introduced to allow the “system” to perform more complicated work, y_{i, new} = f_{hidden}(y_{i}). For an example, when we were young, firstly we learned about alphabets, then W was taught and hence we learned how to make words from them. After that, a “hidden layer” is added so that we know how to make sentences and so on. One might ask, “Why don’t we just learn W that could allow us to write essay directly?”. One could think of the combinations of alphabets vs the combinations of words in an essay, the number of combinations is less with words, hence it is faster and easier to learn. Learning alphabets is easier compared to learning words. Putting that in this context, number of Neurons per hidden layer may not be the same. The layer is hidden because the learned weights might not make any sense to us, it is a knowledge storing capacity.

When there are more layers, the learning capacity increases and hence the buzzword “Deep Learning”. Don’t get shook by it! One could think of it as aligning several batteries serially to increase the Voltage and guess what, Deep Learning is not limited to just Neural Network, it could be a serial combination of any other Machine Learning (ML) algorithms, even a different one at every level. However for the sake of simplicity, let’s limit it to just Neural Network.

What is left to know now is how do we train Neural Network.

For a classification problem, assuming that there are only 3 possible classes and 0 hidden layer, 2 neurons are needed

\begin{aligned} E_{total} = E_1 + E_2\\ E_1 = \frac{1}{2}(y_1 - \hat{y}_1)^2 \\ \hat{y}_1 = \frac{1}{1 + e^{-\sum_{j} w_{j}x_{ij}}} \end{aligned}

where y_1 \in \{0, 1\} is the ground truth, i.e.: y_1 = 1, y_2 = 0 if a certain data point belongs to class 1.

So the learning of weight between feature 1, x_1 and input for Neuron 1, z_1 would be

\begin{aligned} w_{x_1z_1} \gets w_{x_1z_1} - \eta\frac{\partial E_{total}}{\partial w_{x_1z_1}} \\ \frac{\partial E_{total}}{\partial w_{x_1z_1}} = \frac{\partial E_{total}}{\partial y_1}\frac{\partial y_1}{\partial w_{x_1z_1}} \\ \frac{\partial y_1}{\partial w_{x_1z_1}} = \frac{\partial y_1}{\partial z_1}\frac{\partial z_1}{\partial w_{x_1z_1}} \end{aligned}

Neural Network with a single hidden layer is also known as Multi Layer Perceptron (MLP). Assuming that there are 3 classes, and 1 hidden layer with n_{h_1} neurons, there are 2 layer of weights to learn, 1. between hidden layer and output layer, 2. between raw input and hidden layer.

1. Between hidden layer and output layer

\begin{aligned} w_{h_{11}z_1} \gets w_{h_{11}z_1} - \eta\frac{\partial E_{total}}{\partial w_{h_{11}z_1}} \\ \frac{\partial E_{total}}{\partial w_{h_{11}z_1}} = \frac{\partial E_{total}}{\partial y_1}\frac{\partial y_1}{\partial w_{h_{11}z_1}} \\ \frac{\partial y_1}{\partial w_{h_{11}z_1}} = \frac{\partial y_1}{\partial z_1}\frac{\partial z_1}{\partial w_{h_{11}z_1}} \end{aligned}

2. Between input layer and hidden layer

\begin{aligned} w_{x_1h_{11}} \gets w_{x_1h_{11}} - \eta\frac{\partial E_{total}}{\partial w_{x_1h_{11}}} \\ \end{aligned}
\begin{aligned} \frac{\partial E_{total}}{\partial w_{x_1h_{11}}} = \sum_{i}\bigg(\frac{\partial E_i}{\partial y_i}\frac{\partial y_i}{\partial z_i}\frac{\partial z_i}{\partial y_{h_{11}}}\bigg)\frac{\partial y_{h_{11}}}{\partial z_{h_{11}}}\frac{\partial z_{h_{11}}}{\partial w_{x_1h_{11}}} \\ \end{aligned}

where

\begin{aligned} & \frac{\partial E_i}{\partial y_i} & & = -(y_i - \hat{y}_i) \\ & \frac{\partial y_i}{\partial z_i} & & = -\hat{y}_i(1 - \hat{y}_i) \\ & \frac{\partial z_i}{\partial y_{h_{11}}} & & = w_{h_{11}z_i} \\ & \frac{\partial y_{h_{11}}}{\partial z_{h_{11}}} & & = 1 - \hat{y}_{h_{11}} \\ & \frac{\partial z_{h_{11}}}{\partial w_{x_1h_{11}}} & & = x_1 \\ \end{aligned}

to be continued… (RNN and LSTM)

Reference(s):

  1. LSTM
  2. RNN
  3. NN

Multi-Agent Logistics Optimization

This is a simplified version of problem faced by logistics company.

Problem: Given there are # dispatchers and # of locations for delivery, optimize the paths for each dispatcher.

\begin{aligned} & \min & & \sum_{i,j,k} W_{ij}X_{ijk} \\ & \text{where} & & W := \text{Euclidean distance matrix} \\ &&& X := \text{Adjacency matrix} \\ & \text{s.t.} & & i, j \in \{1, \dots, \#Locations \} \\ &&& k \in \{1, \dots, \#Dispatchers \} \\ \end{aligned}

Solution:

Linear Programming. Computing adjacency matrices for all dispatcher in a single run is very expensive and slow. One could adopt divide and conquer strategy for scalability. For example, during the first iteration, set number of dispatcher to 1 to get overall shortest path, then cut it into 2 and rerun the optimizer for each of the parts. More constraints can be introduced to set the minimum and maximum work to be carried out by each of the dispatcher.

from pulp import *
# X to be solved. X is the adjacency matrices. #num_dispatcher X adjacency matrix
adj_mats = LpVariable.dicts("adj_mats", (range_num_dispatcher, 
                                      range_num_loc, 
                                      range_num_loc),0,1,LpInteger)

prob = LpProblem("Logistics Problem", LpMinimize)

for i in range_num_loc:
    for k in range_num_dispatcher:
        # Ensuring no self-recursion.
        prob += choices[k][i][i] == 0, ""
        
    # Ensuring one route will only be taken up by one dispatcher.
    for j in range_num_loc[int(i):]:
        prob += lpSum([adj_mats[k][i][j] 
                       for k in range_num_dispatcher if i != j] + 
                     [adj_mats[k][j][i] 
                       for k in range_num_dispatcher if i != j]) <= 1, ""          
    if int(i) > 0:
        # Ensuring every location is covered.
        prob += lpSum([adj_mats[k][i][j] for j in range_num_loc
                   for k in range_num_dispatcher if i != j]) == 1, ""
        prob += lpSum([adj_mats[k][j][i] for j in range_num_loc
                   for k in range_num_dispatcher if i != j]) == 1, ""

for k in range_num_dispatcher:
    # Ensuring every dispatcher will go through the distribution centre.
    prob += lpSum([adj_mats[k]["0"][j] for j in range_num_loc[1:]]) == 1, ""
    prob += lpSum([adj_mats[k][i]["0"] for i in range_num_loc[1:]]) == 1, ""

    for i in range_num_loc[1:]:
        # Ensuring exit of location if entered.
        prob += lpSum([adj_mats[k][i][j] for j in range_num_loc] + [adj_mats[k][j][i] for j in range_num_loc]) == lpSum([adj_mats[k][i][j] for j in range_num_loc]) * 2 , ""

# Objective function - Minimize travelling.
prob += lpSum([adj_mats[k][i][j] * euc_mat[int(i), int(j)] for k in range_num_dispatcher for i in range_num_loc for j in range_num_loc])

prob.solve()

 

Pain in the Az…kaban

Azkaban is pipe-lining system with scheduling capability, a good substitute for cron as it provides typical manger’s functionalities, e.g.: monitoring, rerun, visualization and so on.

Due to the lack of example, user and vague documentation, it is not easy to setup Azkaban in non-standalone mode. However I was lucky. Albeit I was English educated, my Chinese is not at all bad. I have managed to find critical information in one of the Chinese blogs which is not available elsewhere. Long story short, please take note of following the followings should you want to install it as well.

  1.  If working behind a proxy, for fast POC, you can avoid using ssl in web server by having:
    jetty.use.ssl=false
    jetty.port=1234
  2. For your webserver to work properly:
    azkaban.default.servlet.path=/index
    web.resource.dir=web/
  3. When using AJAX API, use actual url (lalalala.prod.jp.local) not ip address (localhost, 0.0.0.0, 127.0.0.1), if that makes any sense at all:
    curl -k -X POST –data “action=login&username=azkaban&password=azkaban” http://lalalala.prod.jp.local:8080

You can thank me later.

In case of SSL, if you are not familiar already:

  1. Create key
    openssl genrsa -aes128 -out jetty.key
  2. Create cert
    req -new -x509 -newkey rsa:2048 -sha256 -key jetty.key -out jetty.crt
  3. Adding cert to keystore/ truststore
    keytool -keystore keystore -import -alias jetty -file jetty.crt -trustcacerts

References:

  1. http://azkaban.github.io/azkaban/docs/latest/#webserver-setup
  2. http://www.eclipse.org/jetty/documentation/current/configuring-ssl.html#loading-keys-and-certificates
  3. https://unix.stackexchange.com/questions/347116/how-to-create-keystore-and-truststore-using-self-signed-certificate
  4. http://www.cnblogs.com/tannerBG/p/3835952.html

MySQL non-root setup

So, I was trying to setup web server for Azkaban in order to automate or streamline job submission, and it requires a MySQL DB for user authentication process (http://azkaban.github.io/azkaban/docs/latest/#database-setup). Again, as an untrusted employee like many others, I do not have sudo access and that’s causing a lot of pain. Between waiting for System Admins for a few working days and finding out about non-root setup myself, I chose the latter and it was rewarding! Now I no longer need others for MySQL setup ever again.

Note: — = – –

Installing

  1. Download the binaries:
    wget https://dev.mysql.com/get/Downloads/MySQL-5.7/mysql-5.7.19-linux-glibc2.12-x86_64.tar.gz
  2. Unpack it:
    tar -zxvf mysql-5.7.19-linux-glibc2.12-x86_64.tar.gz
  3. Create necessary folders:
    mkdir /your/path/to/mysql_files
    mkdir /your/path/to/mysql_data

Initializing

  1. Initialize MySQL DB:
    /your/path/to/mysql-5.7.19-linux-glibc2.12-x86_64/bin/mysqld –initialize –basedir=/your/path/to/mysql-5.7.19-linux-glibc2.12-x86_64 –datadir=/your/path/to/mysql_data –pid-file=/your/path/to/mysql_files/any_name.pid –character-sets-dir=/your/path/to/mysql-5.7.19-linux-glibc2.12-x86_64/share/charsets/ –general-log-file=/your/path/to/mysql_files/any_name.log –lc-messages-dir=/your/path/to/mysql-5.7.19-linux-glibc2.12-x86_64/share/ –plugin-dir=/your/path/to/mysql_files/lib/plugin –slave-load-tmpdir=/your/path/to/mysql_files/tmp

Starting

  1. Start MySQL DB:
    /your/path/to/bin/mysqld –basedir=/your/path/to/mysql-5.7.19-linux-glibc2.12-x86_64 –datadir=/your/path/to/mysql_data–pid-file=/your/path/to/mysql_files/any_name.pid –character-sets-dir=/your/path/to/mysql-5.7.19-linux-glibc2.12-x86_64/share/charsets/ –general-log-file=/your/path/to/mysql_files/any_name.log –lc-messages-dir=/your/path/to/mysql-5.7.19-linux-glibc2.12-x86_64/share/english/ –plugin-dir=/your/path/to/mysql_files/lib/plugin –slave-load-tmpdir=/your/path/to/mysql_files/tmp –socket=/your/path/to/mysql_files/any_name.sock

Accessing

  1. Getting into client:
    /your/path/to/mysql-5.7.19-linux-glibc2.12-x86_64/mysql –socket=/your/path/to/mysql_files/any_name.sock -u root -p
  2. Changing password:
    SET PASSWORD = PASSWORD(‘xxxxxxxx’);

Shutting down

  1. Shutting down:
    /your/path/to/mysql-5.7.19-linux-glibc2.12-x86_64/bin/mysqladmin -u root -p –socket=/your/path/to/mysql_files/any_name.sock shutdown

Ta-Da! You get to interact with Sys. Admin less now = More productivity!