简介
这一章节我们讲解马尔科夫模型。给定一组随机变量(如顾客最近的交易日期),马尔科夫模型只根据前一个状态(前一个最近交易日期)的分部指示该变量最近的分布。
1、马尔科夫链基本原理
令 $$ S = {S_1,S_2,...,S_n} $$ 是一个有限的状态集,我们希望得到如下的结果: $$ P(S_n|S_{n-1},S_{n-2},...,S_{1}) \eqsim P(S_n|S_{n-1}) $$ 这个近似公式表明了一阶马尔科夫性质(markov property):系统在时间t+1的状态只与系统在时间t的状态有关。
那么,如果我们在严谨一点,将这个关联状态再往前面推一个时间点,得到的就是二阶马尔科夫模型: $$ P(S_n|S_{n-1},S_{n-2},...,S_{1}) \eqsim P(S_n|S_{n-1},S_{n-2}) $$
下面使用马尔科夫假设描述联合概率: $$ P(S_1,S_2,...,S_n) = \prod_{i=1}^n P(S_i,S_{i-1}) $$
总结:
-
如果一个随机序列的分布仅由其当前状态确定,则具有markov性质。具有这个性质的随机过程称为马尔科夫随机过程(markov random process)。
-
对于可观察的状态序列(即状态由数据可知),可以得到一个马尔科夫链模型(markov chain model,MCM),我们可以使用这个模型来做一些预测。
-
对于不可观察状态,会得到一个隐式马尔科夫模型(hidden markov model,HMM)
接下来我们给出将要用到的马尔科夫链的形式化表示
有限的状态空间(state space) $$ S={S_1,S_2,...S_n} $$
转移概率(transition properties)
函数$f:S \times S \to R$:转化结果为一个N阶方阵,其中N是状态的数量,其中:
- $0 \le f(a,b) \le 1$:即转移概率的值在0到1之间;
- $\sum_{b \in S} f(a,b) = 1$:即同一个状态转向当前所有可能的状态的概率和为1;
初始分布(initial distribution)
函数$g:S \times R$,其中:
- $0 \le g(a) \le 1$
- $\sum_{a\in S}g(a) = 1$
马尔科夫链是S中的一个随机过程:
- 时间0时,这个链的初始状态用分布函数$g$创建;
- 如果时间t时马尔科夫链的状态为$a$,则时间$t+1$时,对于各个$b \in S$,其状态为b的概率为$f(a,b)$
2、马尔科夫状态表的运用
假设我们已经得到了一张马尔科夫状态表。如下表所示的城市天气变化表:
|今天天气\明天天气|晴天(sunny)|有雨(rainy)|多云(cloudy)|有雾(foggy)| |-|-|-|-|-| |晴天(sunny)|0.6|0.1|0.2|0.1| |有雨(rainy)|0.5|0.2|0.2|0.1| |多云(cloudy)|0.1|0.7|0.1|0.1| |有雾(foggy)|0.0|0.3|0.4|0.3|
我们假设:
- 城市天气只包含四种可能,即晴天(sunny)、有雨(rainy)、多云(cloudy)和有雾(foggy);
- 一天中天气不会变化;
我们来解释这张表的含义,其实我们完全可以以4阶方阵的形式表示,但为了清晰说明,将其以表格展示。
其中每一个数字代表:当前的天气为XX时,明天的天气为XX的概率。例如,第一行第一列的数据0.6,就代表的是当前天气为晴天时,明天天气为晴天的概率为0.6.
我们就可以用马尔科夫性质的思想回答下面的问题:
如果今天的天气状态是晴天,那么明天是多云而且后天有雾的概率是多大?根据马尔科夫链的思想,明天是多余的概率只与今天有关,后天有雾的概率只与明天有关,因此:
P = 0.2(第一行第三列数据) * 0.1(第三行第四列)
= 0.02
可见,我们的重点在于由数据推导出马尔科夫状态矩阵的过程。
2、分析客户交易数据
假设我们有这样的一些交易数据:
55FRL8G23B,1381907878,2013-01-09,129
9SX0DJG9L4,1381907879,2013-01-09,34
0ANMD0T113,1381907880,2013-01-09,144
W1F4412PA8,1381907881,2013-01-09,26
22Z10EAYC3,1381907882,2013-01-09,167
3R56N17P1H,1381907883,2013-01-09,25
LK0P6K3DE4,1381907884,2013-01-09,25
3A4S4BMPMU,1381907885,2013-01-09,113
OF4CEO2814,1381907886,2013-01-09,138
9ICNYOFS41,1381907887,2013-01-09,79
N4EOB264U6,1381907888,2013-01-09,108
3C0WARCKYJ,1381907889,2013-01-09,204
PTT3BI00AZ,1381907890,2013-01-09,27
14UHHAVQ2Q,1381907891,2013-01-09,73
Z9GFE6TDKF,1381907892,2013-01-09,32
...
数据意义为(交易ID,顾客ID,交易日期,交易金额)。
2.1、生成状态序列
这一节的目标是将交易序列(transcation sequence)转化为一个状态序列(state sequence)。我们的应用的目的是为了生成以下的输出 $$ customerId(Date_1,Amount_1);(Date_2,Amount_2),...,(Date_n,Amount_n) $$ 其中日期按增序排序。
我们要把这个输出转化为一个状态序列,如下所示: $$ customerId,State_1,State_2,...,State_n $$
我们还需要将(purchase-date,amount)对的有序序列转换为一组马尔科夫链状态。状态由一个代号(两个字母)表示,各个字母的定义如下所示:
|上一次交易后经过的时间|与前次交易相比的交易额| |-|-| |S:小|L:显著小于| |M:中|E:基本相同| |L:大|G:显著大于|
因此,我们可以得到9种状态
|状态名|上一次交易后的经过时间与其次交易相比的交易额| |-|-| |SL|小:显著小于| |SE|小:基本相同| |SG|小:显著大于| |ML|中:显著小于| |ME|中:基本相同| |MG|中:显著大于| |LL|大:显著小于| |LE|大:基本相同| |LG|大:显著大于|
可以看到,我们的马尔科夫链模型有9个状态(9*9矩阵)。
转换过程可以使用一些小脚本实现,但是如果数据量过大,还是需要一个map过程。
2.2、使用MapReduce生成马尔科夫状态转移矩阵
3、Spark实现
3.1、主类
下面是spark的主任务类。下一章节会列出所使用到的辅助类。
package com.sunrun.movieshow.algorithm.markov;
import com.sunrun.movieshow.algorithm.common.SparkHelper;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.io.Serializable;
import java.util.*;
public class SparkMarkov {
public static void main(String[] args) {
JavaSparkContext sc = SparkHelper.getSparkContext("markov");
// 1.归约到不同的分区
JavaRDD<String> rdd = sc.textFile("data/markov").coalesce(8);
// 2.得到(顾客ID,(交易时间,交易额))
JavaPairRDD<String, Tuple2<Long, Integer>> pairRDD = rdd.mapToPair(line -> {
// tokens[0] = customer-id
// tokens[1] = transaction-id
// tokens[2] = purchase-date
// tokens[3] = amount
String[] tokens = line.split(",");
if (tokens.length != 4) {
return null;
} else {
long date = 0;
try {
date = DateUtil.getDateAsMilliSeconds(tokens[2]);
}
catch(Exception e) {
// 会有异常数据
}
int amount = Integer.parseInt(tokens[3]);
Tuple2<Long, Integer> V = new Tuple2<>(date, amount);
return new Tuple2<>(tokens[0], V);
}
});
/**
* (V31E55G4FI,(1356969600000,123))
* (301UNH7I2F,(1356969600000,148))
* (PP2KVIR4LD,(1356969600000,163))
* (AC57MM3WNV,(1356969600000,188))
* ...
*/
// 4.依据交易ID进行分组 - 用户信息在最后的状态链中是没用的,但他可以限定一些状态切换的内在关系,
// 这在很多算法过程中都是隐含条件,需要经验判断。
JavaPairRDD<String, Iterable<Tuple2<Long, Integer>>> customerRDD = pairRDD.groupByKey();
/**
* (V31E55G4FI,(1356969600000,123),1356969600000,148)....)
* ...
*/
// 5.创建马尔科夫状态序列 (c_id,<(time1,amount),(time2,amount)>) => (String)
JavaPairRDD<String, List<String>> stateSequence =
customerRDD.mapValues((Iterable<Tuple2<Long,Integer>> dateAndAmount) -> {
List<Tuple2<Long,Integer>> list = toList(dateAndAmount);
Collections.sort(list, TupleComparatorAscending.INSTANCE);
// now convert sorted list (be date) into a "state sequence"
List<String> stateSequence1 = toStateSequence(list);
return stateSequence1;
});
/**
* stateSequence.saveAsTextFile("out/" + UUID.randomUUID());
* 顾客id,与其相关的状态序列
* (K40E0LA5DL,[LL, MG, SG, SL, SG, MG, SL, SG, SL, SG, LL])
* (ICF0KFGK12,[SG, SL, SE, SG, LL, LG])
* (4N0B1U5HVG,[SG, ML, MG, SG, SL, SG, SL, SG, ML])
* (3KJR1907D9,[SG, SL, ML, SG, ML, LG])
* (47620I9LOD,[LG, SL, ML, MG, SG, SL, SG, SL, SG])
*/
// 6.接下来,我们将状态序列以窗口为2的方式依次移动,生成一个个形如((LL, MG),1)的对
JavaPairRDD<Tuple2<String, String>, Integer> model = stateSequence.flatMapToPair(s -> {
// 输出形式((fromState,toState),times)
ArrayList<Tuple2<Tuple2<String, String>, Integer>> mapperOutput = new ArrayList<>();
List<String> states = s._2;
if (states == null) {
return Collections.emptyIterator();
} else {
for (int i = 0; i < states.size() - 1; i++) {
String fromState = states.get(i);
String toState = states.get(i + 1);
Tuple2<String, String> k = new Tuple2<>(fromState, toState);
mapperOutput.add(new Tuple2<>(k, 1));
}
}
return mapperOutput.iterator();
});
/**
* model.saveAsTextFile("out/model");
* ((LG,LL),1)
* ((LL,MG),1)
* ((MG,SG),1)
* ((SG,SL),1)
* ((SL,ME),1)
* ((ME,MG),1)
*
*/
// 7.我们需要将这些结果组合归约,将相同的状态序列进行合并,即从(f,t),1) => ((f,t),3000)的形式。
JavaPairRDD<Tuple2<String, String>, Integer> markovModel = model.reduceByKey((a, b) -> a + b);
/**
* markovModel.saveAsTextFile("/out/markov");
* ((LL,SL),993)
* ((MG,LL),1859)
* ((LE,ME),25)
* ((SL,ME),1490)
* ((LG,ME),153)
* ((ML,LG),3991)
* ((ME,ME),58)
*/
// 8.我们格式化一下输出,将其形式转变为(f,t,times)的形式
JavaRDD<String> markovFormatModel = markovModel.map(t -> t._1._1 + "\t" + t._1._2 + "\t" + t._2);
// 9.将结果存储到HDFS服务器,当然也可以存储到本地,这时候解析类就要使用本地文件系统的API
String markovFormatModelStorePath = "hdfs://10.21.1.24:9000/markov/";
markovFormatModel.saveAsTextFile(markovFormatModelStorePath);
// 9.将最终结果进行转换,生成马尔科夫概率模型
MarkovTableBuilder.transport(markovFormatModelStorePath);
/** 这是一个state阶的方阵,A(ij)表示状态i到状态j的转化概率
* 0.03318,0.02441,0.6608,0.1373,0.007937,0.02398,0.06456,0.009522,0.03832
* 0.3842,0.02532,0.2709,0.1260,0.009590,0.01331,0.1251,0.01083,0.03477
* 0.6403,0.02881,0.05017,0.1487,0.01338,0.008602,0.08359,0.01446,0.01196
* 0.02081,0.002368,0.7035,0.01170,0.004638,0.1518,0.03901,0.005933,0.06030
* 0.4309,0.02140,0.2657,0.1277,0.009697,0.02006,0.08159,0.009530,0.03344
* 0.1847,0.01816,0.5316,0.1303,0.007175,0.02351,0.06265,0.008136,0.03379
* 0.01847,0.004597,0.6115,0.02068,0.003091,0.1346,0.06828,0.01424,0.1245
* 0.2250,0.01900,0.2427,0.06291,0.006335,0.03182,0.2757,0.03801,0.09847
* 0.2819,0.01944,0.1954,0.07990,0.008015,0.03202,0.2612,0.04427,0.07781
*/
}
// 将迭代器转化为数组
static List<Tuple2<Long,Integer>> toList(Iterable<Tuple2<Long,Integer>> iterable) {
List<Tuple2<Long,Integer>> list = new ArrayList<Tuple2<Long,Integer>>();
for (Tuple2<Long,Integer> element: iterable) {
list.add(element);
}
return list;
}
// 按时间进行排序
static class TupleComparatorAscending implements Comparator<Tuple2<Long, Integer>>, Serializable {
final static TupleComparatorAscending INSTANCE = new TupleComparatorAscending();
@Override
public int compare(Tuple2<Long, Integer> t1, Tuple2<Long, Integer> t2) {
// return -t1._1.compareTo(t2._1); // sorts RDD elements descending
return t1._1.compareTo(t2._1); // sorts RDD elements ascending
}
}
// 将一个有序的交易序列(List<Tuple2<Date,Amount>>)转化为状态序列(List<String>),其中各个元素分别表示一个马尔科夫状态。
static List<String> toStateSequence(List<Tuple2<Long,Integer>> list){
// 没有足够的数据
if(list.size() < 2){
return null;
}
List<String> stateSequence = new ArrayList<>();
// == 两两配对计算结果
Tuple2<Long, Integer> prior = list.get(0);
for (int i = 1; i < list.size(); i++) {
Tuple2<Long, Integer> current = list.get(i);
// === 计算时间差(天),由于数据是以ms计数的,因此需要转化为天(1d = 24*60*60*1000=86400000ms)
long dayDiff = (current._1 - prior._1) / 86400000;
// === 获取交易额信息
int priorAmount = prior._2;
int currentAmount = current._2;
// === 根据业务规则转化为字母表示
// ==== 处理时间关系
String dd = null;
if(dayDiff < 30){
dd = "S";
}else if(dayDiff < 60){
dd = "M";
}else {
dd = "L";
}
// ==== 处理金额关系: 使用两次交易额的比重
String ad = null;
if(priorAmount < 0.9 * currentAmount){
ad = "L";
}else if(priorAmount < 1.1 * currentAmount){
ad = "E";
}else{
ad = "G";
}
// === 组合结果
String element = dd + ad;
stateSequence.add(element);
// 大小为2的窗口前进一格
prior = current;
}
return stateSequence;
}
}
3.2、辅助类
日期处理类
package com.sunrun.movieshow.algorithm.markov;
import java.text.SimpleDateFormat;
import java.util.Date;
public class DateUtil {
static final String DATE_FORMAT = "yyyy-MM-dd";
static final SimpleDateFormat SIMPLE_DATE_FORMAT =
new SimpleDateFormat(DATE_FORMAT);
/**
* Returns the Date from a given dateAsString
*/
public static Date getDate(String dateAsString) {
try {
return SIMPLE_DATE_FORMAT.parse(dateAsString);
}
catch(Exception e) {
return null;
}
}
/**
* Returns the number of milliseconds since January 1, 1970,
* 00:00:00 GMT represented by this Date object.
*/
public static long getDateAsMilliSeconds(Date date) throws Exception {
return date.getTime();
}
/**
* Returns the number of milliseconds since January 1, 1970,
* 00:00:00 GMT represented by this Date object.
*/
public static long getDateAsMilliSeconds(String dateAsString) throws Exception {
Date date = getDate(dateAsString);
return date.getTime();
}
public static String getDateAsString(long timestamp) {
return SIMPLE_DATE_FORMAT.format(timestamp);
}
}
2、输入输出工具类
package com.sunrun.movieshow.algorithm.markov;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.BufferedReader;
//
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.util.LineReader;
/**
* This class provides convenient methods for accessing
* some Input/Output methods.
*
* @author Mahmoud Parsian (mahmoud.parsian@yahoo.com)
*
*/
public class InputOutputUtil {
public static void close(LineReader reader) {
if (reader == null) {
return;
}
//
try {
reader.close();
}
catch (Exception ignore) {
}
}
public static void close(OutputStream stream) {
if (stream == null) {
return;
}
//
try {
stream.close();
}
catch (Exception ignore) {
}
}
public static void close(InputStream stream) {
if (stream == null) {
return;
}
//
try {
stream.close();
}
catch (Exception ignore) {
}
}
public static void close(FSDataInputStream stream) {
if (stream == null) {
return;
}
//
try {
stream.close();
}
catch (Exception ignore) {
}
}
public static void close(BufferedReader reader) {
if (reader == null) {
return;
}
//
try {
reader.close();
}
catch (Exception ignore) {
}
}
}
3、马尔科夫状态处理类
package com.sunrun.movieshow.algorithm.markov;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* 输入数据生成马尔科夫状态表
*/
public class MarkovTableBuilder {
// 状态的数量
private int numberOfState;
private int scale;
// 状态表
private double[][] table = null;
// 状态序列
private Map<String, Integer> states = null;
public MarkovTableBuilder(int numberOfState, int scale){
this(numberOfState);
this.scale = scale;
}
private MarkovTableBuilder(int numberOfState){
this.numberOfState = numberOfState;
table = new double[numberOfState][numberOfState];
initStates();
}
// 初始化state状态
private void initStates(){
states = new HashMap<String, Integer>();
states.put("SL", 0);
states.put("SE", 1);
states.put("SG", 2);
states.put("ML", 3);
states.put("ME", 4);
states.put("MG", 5);
states.put("LL", 6);
states.put("LE", 7);
states.put("LG", 8);
}
// 将状态信息添加到状态表
public void add(StateItem item){
// 获取该状态对应的角标
int row = states.get(item.fromState);
int column = states.get(item.toState);
table[row][column] = item.count;
}
public void normalize() {
//
// 拉普拉斯校正:一般通过将所有的计数+1来进行。see: http://cs.nyu.edu/faculty/davise/ai/bayesText.html
for (int r = 0; r < numberOfState; r++) {
boolean gotZeroCount = false;
for (int c = 0; c < numberOfState; c++) {
if(table[r][c] == 0) {
gotZeroCount = true;
break;
}
}
if (gotZeroCount) {
for (int c = 0; c < numberOfState; c++) {
table[r][c] += 1;
}
}
}
// normalize
for (int r = 0; r < numberOfState; r++) {
double rowSum = getRowSum(r);
for (int c = 0; c < numberOfState; c++) {
table[r][c] = table[r][c] / rowSum;
}
}
}
// 获取rowNumber行的累加结果
public double getRowSum(int rowNumber) {
double sum = 0.0;
for (int column = 0; column < numberOfState; column++) {
sum += table[rowNumber][column];
}
return sum;
}
/**
* 存储状态表:作为示例,只是将结果输出即可。
*/
private void persist() {
for (int row = 0; row < numberOfState; row++) {
StringBuilder builder = new StringBuilder();
for (int column = 0; column < numberOfState; column++) {
double element = table[row][column];
builder.append(String.format("%.4g", element));
if (column < (numberOfState - 1)) {
builder.append(",");
}
}
System.out.println(builder.toString());
}
}
public static void transport(String markovFormatDataPath){
List<StateItem> items = ReadDataFromHDFS.readDirectory(markovFormatDataPath);
MarkovTableBuilder markovTableBuilder = new MarkovTableBuilder(9);
for (StateItem item : items) {
markovTableBuilder.add(item);
}
// 归一化数据
markovTableBuilder.normalize();
// 存储马尔科夫状态表
markovTableBuilder.persist();
}
}
4、从文件系统读取数据的处理类
package com.sunrun.movieshow.algorithm.markov;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Logger;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;
/**
* Class containing a number of utility methods for manipulating
* Hadoop's SequenceFiles.
*
*
* @author Mahmoud Parsian
*
*/
public class ReadDataFromHDFS {
private ReadDataFromHDFS() {
}
public static List<StateItem> readDirectory(String path) {
return ReadDataFromHDFS.readDirectory(new Path(path));
}
public static List<StateItem> readDirectory(Path path) {
FileSystem fs;
try {
fs = path.getFileSystem(new Configuration());
} catch (IOException e) {
System.out.println("Unable to access the hadoop file system!");
throw new RuntimeException("Unable to access the hadoop file system!");
}
List<StateItem> list = new ArrayList<StateItem>();
try {
FileStatus[] stat = fs.listStatus(path);
for (int i = 0; i < stat.length; ++i) {
if (stat[i].getPath().getName().startsWith("part")) {
List<StateItem> pairs = readFile(stat[i].getPath(), fs);
list.addAll(pairs);
}
}
}
catch (IOException e) {
System.out.println("Unable to access the hadoop file system!");
throw new RuntimeException("Error reading the hadoop file system!");
}
return list;
}
@SuppressWarnings("unchecked")
public static List<StateItem> readFile(Path path, FileSystem fs) {
List<StateItem> list = new ArrayList<StateItem>();
FSDataInputStream stream = null;
BufferedReader reader = null;
try {
stream = fs.open(path);
reader = new BufferedReader(new InputStreamReader(stream));
String line;
while ((line = reader.readLine()) != null) {
// line = <fromState><,><toState><TAB><count>
String[] tokens = line.split("\t"); // TAB separator
if (tokens.length == 3) {
StateItem item = new StateItem(tokens[0], tokens[1], Integer.parseInt(tokens[2]));
list.add(item);
}
}
}
catch (IOException e) {
System.out.println("readFileIntoCoxRegressionItem() failed!");
throw new RuntimeException("readFileIntoCoxRegressionItem() failed!");
}
finally {
InputOutputUtil.close(reader);
InputOutputUtil.close(stream);
}
return list;
}
public static void main(String[] args) throws Exception {
String path = "hdfs://10.21.1.24:9000/markov/";
List<StateItem> list = readDirectory(path);
System.out.println("list="+list.toString());
}
}
5、马尔科夫状态实体类
package com.sunrun.movieshow.algorithm.markov;
/**
* TableItem represents an item of a Markov State Transition Model
* as a Tuple3<fromSate, toState, count>
*
*/
public class StateItem {
String fromState;
String toState;
int count;
public StateItem(String fromState, String toState, int count) {
this.fromState = fromState;
this.toState = toState;
this.count = count;
}
/**
* for debugging ONLY
*/
public String toString() {
return "{"+fromState+"," +toState+","+count+"}";
}
}
4、使用最终的马尔科夫表进行预测
前面我已经在代码注释中拿出了最后的分析结果:
||SL|SE|SG|ML|ME|MG|LL|LE|LG| |-|-|-|-|-|-|-|-|-|-| |SL|0.03318|0.02441|0.6608|0.1373|0.007937|0.02398|0.06456|0.009522|0.03832 |SE|0.3842|0.02532|0.2709|0.1260|0.009590|0.01331|0.1251|0.01083|0.03477 |SG|0.6403|0.02881|0.05017|0.1487|0.01338|0.008602|0.08359|0.01446|0.01196 |ML|0.02081|0.002368|0.7035|0.01170|0.004638|0.1518|0.03901|0.005933|0.06030 |ME|0.4309|0.02140|0.2657|0.1277|0.009697|0.02006|0.08159|0.009530|0.03344 |MG|0.1847|0.01816|0.5316|0.1303|0.007175|0.02351|0.06265|0.008136|0.03379 |LL|0.01847|0.004597|0.6115|0.02068|0.003091|0.1346|0.06828|0.01424|0.1245 |LE|0.2250|0.01900|0.2427|0.06291|0.006335|0.03182|0.2757|0.03801|0.09847 |LG|0.2819|0.01944|0.1954|0.07990|0.008015|0.03202|0.2612|0.04427|0.07781
参考
数据算法 —— Mahmoud Parsian(很喜欢该作者写的这本书,非常详细)
附录
1、ruby脚本
mark_plan.rb
#!/usr/bin/ruby
require '../lib/util.rb'
require 'Date'
xaction_file = ARGV[0]
model_file = ARGV[1]
userXactions = {}
model = []
states = ["SL", "SE", "SG", "ML", "ME", "MG", "LL", "LE", "LG"]
# read all xactions
File.open(xaction_file, "r").each_line do |line|
items = line.split(",")
custID = items[0]
if (userXactions.key? custID)
hist = userXactions[custID]
else
hist = []
userXactions[custID] = hist
end
hist << items[2, items.size - 2]
end
#read model
File.open(model_file, "r").each_line do |line|
items = line.split(",")
#puts "#{line}"
row = []
items.each do |item|
row << item.to_i
#puts "#{item}"
end
model << row
#puts "#{row}"
end
# marketing time
userXactions.each do |cid, xactions|
seq = []
last_date = Date.parse "2000-01-01"
xactions.each_with_index do |xaction, index|
if (index > 0)
prevXaction = xactions[index-1]
prDate = Date.parse prevXaction[0]
prAmt = prevXaction[1].to_i
date = Date.parse xaction[0]
last_date = date
amt = xaction[1].to_i
daysDiff = date - prDate
amtDiff = amt - prAmt
if (daysDiff < 30)
dd = "S"
elsif (daysDiff < 60)
dd = "M"
else
dd = "L"
end
if (prAmt < 0.9 * amt)
ad = "L"
elsif (prAmt < 1.1 * amt)
ad = "E"
else
ad = "G"
end
seq << (dd + ad)
end
end
if (!seq.empty?)
last = seq[-1]
row_index = states.index(last)
row = model[row_index]
max_col = row.max
col_index = row.index(max_col)
next_state = states[col_index]
if (next_state.start_with?("S"))
next_date = last_date + 15
elsif (next_state.start_with?("M"))
next_date = last_date + 45
else
next_date = last_date + 90
end
#puts "#{cid}, #{last}, #{row_index}, #{max_col}, #{col_index}, #{next_state}, #{last_date}, #{next_date}"
puts "#{cid}, #{next_date}"
end
end
buy_xaction.rb
#!/usr/bin/ruby
require '../lib/util.rb'
require 'Date'
custCount = ARGV[0].to_i
daysCount = ARGV[1].to_i
visitorPercent = ARGV[2].to_f
custIDs = []
xactionHist = {}
# transition probability matrix
idGen = IdGenerator.new
1.upto custCount do
custIDs << idGen.generate(10)
end
xid = Time.now().to_i
date = Date.parse "2013-01-01"
1.upto daysCount do
numXaction = visitorPercent * custCount
factor = 85 + rand(30)
numXaction = (numXaction * factor) / 100
1.upto numXaction do
custID = custIDs[rand(custIDs.size)]
if (xactionHist.key? custID)
hist = xactionHist[custID]
lastXaction = hist[-1]
lastDate = lastXaction[0]
lastAmt = lastXaction[1]
numDays = date - lastDate
if (numDays < 30)
amount = lastAmt < 40 ? 50 + rand(20) - 10 : 30 + rand(10) - 5
elsif (numDays < 60)
amount = lastAmt < 80 ? 100 + rand(40) - 20 : 60 + rand(20) - 10
else
amount = lastAmt < 150 ? 180 + rand(60) - 30 : 120 + rand(40) - 20
end
else
hist = []
xactionHist[custID] = hist
amount = 40 + rand(180)
end
xaction = []
xaction << date
xaction << amount
hist << xaction
xid = xid + 1
puts "#{custID},#{xid},#{date},#{amount}"
end
date = date.next
end
所有评论(0)