搜索
查看: 884|回复: 0

轻量级内存计算引擎(2)

[复制链接]

11

主题

1

回帖

292

积分

中级会员

积分
292
发表于 2018-10-18 19:10:44 | 显示全部楼层 |阅读模式
七、        关联计算 关联计算是关系型数据库的核心算法,在内存计算中应用广泛,比如:统计每年每月的订单数量和订单金额。
该算法对应Oracle的SQL语句为:
select   to_char(订单.订单日期,'yyyy') AS 年份,to_char(订单.订单日期,'MM') AS 月份,sum(订单明细.单价*订单明细.数量) AS 销售金额,count(1) AS 订单数量
from   订单明细 left join 订单 on 订单明细.订单ID=订单.订单ID
group   by to_char(订单.订单日期,'yyyy'),to_char(订单.订单日期,'MM')
用集算器实现上述算法时,加载数据的脚本不变,业务算法如下(algorithm_2.dfx)

A
1
=join(订单明细:子表,订单ID;订单:主表,订单ID)
2
=A1.groups(year(主表.订单日期):年份, month(主表.订单日期):月份; sum(子表.单价*子表.数量):销售金额, count(1):订单数量)
A1:将订单明细与订单关联起来,子表主表为别名,点击单元格可见结果如下

可以看到,集算器join函数与SQL join语句虽然作用一样,但结构原理大不相同。函数join关联形成的结果,其字段值不是原子数据类型,而是记录,后续可用“.”号表达关系引用,多层关联非常方便。
A2:分组汇总。
计算结果如下:
年份
月份
销售金额
订单数量
2012
7
28988
57
2012
8
26799
71
2012
9
27201
57
2012
10
37793.7
69
2012
11
49704
66




关联关系分很多类,上述订单和订单明细属于其中一类:主子关联。针对主子关联,只需在加载数据时各自按关联字段排序,业务算法中就可用归并算法来提高性能。例如:
=join@m(订单明细:子表,订单ID;订单:主表,订单ID)
函数join@m表示归并关联,只对同序的两个或多个表有效。
集算器的关联计算与RDB不同,RDR对所有类型的关联关系都采用相同的算法,无法进行有针对性的优化,而集算器采取分而治之的理念,对不同类型的关联关系提供了不同的算法,可进行有针对性的透明优化。
  
除了主子关联,最常用的就是外键关联,常用的外键表(或字典表)有分类、地区、城市、员工、客户等。对于外键关联,集算器也有相应的优化方法,即在数据加载阶段事先建立关联,如此一来业务算法就不必临时关联,性能因此提高,并发时效果尤为明显。另外,集算器用指针建立外键关联,访问速度更快。
比如这个案例:订单表的客户ID字段是外键,对应客户表(客户ID、客户名称、地区、城市),需要统计出每个地区每个城市的订单数量。
数据加载脚本(initData_3.dfx)如下:

A
1
=connect("orcl")
2
=A1.query("select   订单ID,客户ID,订单日期,运货费 from 订单").keys(订单ID)
3
=A1.query@x(“select   客户ID,地区,城市 from 客户”).keys(客户ID)
4
=A2.switch(客户ID,A3:客户ID)
5
=env(订单,A2)
6
=env(客户,A3)
A4:用函数switch建立外键关联,将订单表的客户ID字段,替换为客户表相应记录的指针。
业务算法脚本如下(algorithm_3.dfx)如下

A
1
=订单.groups(客户ID.地区:地区 ,客户ID.城市:城市;count(1):订单数量)
加载数据时已经建立了外键指针关联,所以A1中的“客户ID”表示:订单表的客户ID字段所指向的客户表记录,“客户ID.地区”即客户表的地区字段。
脚本中多处使用“.”号表达关联引用,语法比SQL直观易懂,遇到多表多层关联时尤为便捷。而在SQL中,关联一多如同天书。
        上述计算结果如下:
地区
城市
订单数量
东北
大连
40
华东
南京
89
华东
南昌
15
华东
常州
35
华东
温州
18



  
  
  
  
  
八、        内外混合计算 内存计算虽然快,但是内存有限,因此通常只驻留最常用、并发访问最多的数据,而内存放不下或访问频率低的数据,还是要留在硬盘,用到的时候再临时加载,并与内存数据共同参与计算。这就是所谓的内外混合计算。
下面举例说明集算器中的内外混合计算。
案例描述:某零售行业系统中,订单明细访问频率较低,数据量较大,没必要也没办法常驻内存。现在要将订单明细与内存里的订单关联起来,统计出每年每种产品的销售数量。
数据加载脚本(initData_4.dfx)如下:

A
1
=connect("orcl")
2
=A1.query@x("select   订单ID,客户ID,订单日期,运货费 from 订单 order by 订单ID").keys(订单ID)
4
=env(订单,A2)
业务算法脚本(algorithm_4.dfx)如下:

A
1
=connect("orcl")
2
=A1.cursor@x("select   订单ID,产品ID,数量 from 订单明细order by 订单ID")
3
=订单.cursor()
4
=joinx(A2:子表,订单ID; A3:主表,订单ID)
5
=A4.groups(year(主表.订单日期):年份,子表.产品ID:产品 ;sum(子表.数量):销售数量)
A2:执行SQL,以游标方式取订单明细,以便计算远超内存的大量数据。
A3:将订单表转为游标模式,下一步会用到。
A4:关联订单明细表和订单表。函数joinx与join@m作用类似,都可对有序数据进行归并关联,区别在于前者对游标有效,后者对序表有效。
A5:执行分组汇总。
  
九、        数据更新 数据库中的物理表总会变化,这种变化应当及时反映到共享的内存表中,才能保证内存计算结果的正确,这种情况下就需要更新内存。如果物理表较小,那么解决起来很容易,只要定时执行初始化数据脚本(initData.dfx)就可以了。但如果物理表太大,就不能这样做了,因为初始化脚本会进行全量加载,本身就会消耗大量时间,而且加载时无法进行内存计算。例如:某零售巨头订单数据量较大,从数据库全量加载到内存通常超过5分钟,但为保证一定的实时性,内存数据又需要5分钟更新一次,显然,两者存在明显的矛盾。
解决思路其实很自然,物理表太大的时候,应该进行增量更新,5分钟的增量业务数据通常很小,增量不会影响更新内存的效率。
要实现增量更新,就需要知道哪些是增量数据,不外乎以下三种方法:
方法A:在原表加标记字段以识别。缺点是会改动原表。
方法B:在原库创建一张“变更表”,将变更的数据记录在内。好处是不动原表,缺点是仍然要动数据库。
方法C:将变更表记录在另一个数据库,或文本文件Excel中。好处是对原数据库不做任何改动,缺点是增加了维护工作量。
集算器支持多数据源计算,所以方法B、C没本质区别,下面就以B为例更新订单表。
  
第一步,在数据库中建立“订单变更表”,继承原表字段,新加一个“变更标记”字段,当用户修改原始表时,需要在变更表同步记录。如下所示的订单变更表,表示新增1条修改2条删除1条。
订单ID(key)
客户ID
订单日期
运货费
变更标记
10247
VICTE
2012-07-08
101
新增
10248
VINET
2012-07-04
102
修改
10249
TOMSP
2012-07-05
103
修改
10250
HANAR
2012-07-08
104
删除
  
第二步,编写集算器脚本updatemem_4.dfx,进行数据更新。

A
B
1
=connect("orcl")

2
=订单cp=订单.derive()

3
=A1.query("select 订单ID,客户ID,订购日期 订单日期,运货费,变更标记 from 订单变更")

4
=订单删除=A3.select(变更标记=="删除")
=订单cp.select(订单删除.(订单ID).contain(订单ID))
5

=订单cp.delete(B4)
6
=订单新增=A3.select(变更标记=="新增")
=订单cp.insert@f(0:订单新增)
7
=订单修改=A3.select(变更标记=="修改")
=订单cp.select(订单修改.(订单ID).pos(订单ID))
8

=订单cp.delete(B7)
9

=订单cp.insert@f(0:订单修改)
10
=env(订单,订单cp)

11
=A1.execute("delete from 订单变更")

12
=A1.close()

A1:建立数据库连接。
A2:将内存中的订单复制一份,命名为订单cp。下面过程只针对订单cp进行修改,修改完毕再替代内存中的订单,期间订单仍可正常进行业务计算。
A3:取数据库订单变更表。
A4-B5:取出订单变更表中需删除的记录,在订单cp中找到这些记录,并删除。
A6-B6:取出订单变更表中需新增的记录,在订单cp中追加。
A7-B9:这一步是修改订单cp,相当于先删除再追加。也可用modify函数实现修改。
A10:将修改后的订单cp常驻内存,命名为订单。
A11-A12:清空“变更表”,以便下次取新的变更记录。
上述脚本实现了完整的数据更新,而实际上很多情况下只需要追加数据,这样脚本还会简单很多。
        脚本编写完成后,还需第三步:定时5分钟执行该脚本。   
        定时执行的方法有很多。如果集算器部署为独立服务,与Web应用没有共用JVM,那么可以使用操作系统自带的定时工具(计划任务或crontab),使其定时执行集算器命令(esprocx.exe或esprocx.sh)。
有些web应用有自己的定时任务管理工具,可定时执行某个JAVA类,这时可以编写JAVA类,用JDBC调用集算器脚本。
        如果web应用没有定时任务管理工具,那就需要手工实现定时任务,即编写JAVA类,继承java内置的定时类TimerTask,在其中调用集算器脚本,再在启动类中调用定时任务类。
        其中启动类myServle4为:
1.     import java.io.IOException;      
2.     import java.util.Timer;      
3.     import javax.servlet.RequestDispatcher;      
4.     import javax.servlet.ServletContext;      
5.     import javax.servlet.ServletException;      
6.     import javax.servlet.http.HttpServlet;      
7.     import javax.servlet.http.HttpServletRequest;      
8.     import javax.servlet.http.HttpServletResponse;      
9.     import org.apache.commons.lang.StringUtils;      
10.   public class myServlet4 extends HttpServlet {      
11.       private static final long serialVersionUID = 1L;      
12.       private Timer timer1 = null;      
13.       private Task task1;               
14.       public ConvergeDataServlet() {      
15.           super();      
16.       }      
17.       public void destroy() {      
18.           super.destroy();      
19.           if(timer1!=null){      
20.               timer1.cancel();      
21.           }      
22.       }      
23.       public void doGet(HttpServletRequest request, HttpServletResponse response)      
24.               throws ServletException, IOException {      
25.       }      
26.       public void doPost(HttpServletRequest request, HttpServletResponse response)      
27.               throws ServletException, IOException {      
28.           doGet(request, response);            
29.       }      
30.       public void init() throws ServletException {      
31.           ServletContext context = getServletContext();      
32.           // 定时刷新时间(5分钟)      
33.           Long delay = new Long(5);      
34.           // 启动定时器      
35.           timer1 = new Timer(true);      
36.           task1 = new Task(context);      
37.           timer1.schedule(task1, delay * 60 * 1000, delay * 60 * 1000);      
38.       }      
39.   }
定时任务类Task为:
11.  import java.util.TimerTask;      
12.  import javax.servlet.ServletContext;  
13.   import java.sql.*;
14.   import com.esproc.jdbc.*;   
15.  public class Task extends TimerTask{      
16.      private ServletContext context;      
17.      private static boolean isRunning = true;      
18.      public Task(ServletContext context){      
19.          this.context = context;      
20.      }      
21.      @Override     
22.      public void run() {      
23.          if(!isRunning){      
24.                com.esproc.jdbc.InternalConnection con=null;
25.                try {
26.                      Class.forName("com.esproc.jdbc.InternalDriver");
27.                      con =(com.esproc.jdbc.InternalConnection)DriverManager.getConnection("jdbc:esproc:local://");
28.                      ResultSet rs = con.executeQuery("call updatemem_4()");
29.                }
30.                catch (SQLException e){
31.                      out.println(e);
32.                }finally{
33.                    //关闭数据集
34.                          if (con!=null) con.close();
35.                }
36.          }      
37.      }      
38.  }      
十、        综合示例 下面,通过一个综合示例来看一下在数据源多样、算法复杂的情况下,集算器如何很好地实现内存计算:
案例描述:某B2C网站需要试算订单的邮寄总费用,以便在一定成本下挑选合适的邮费规则。大部分情况下,邮费由包裹的总重量决定,但当订单的价格超过指定值时(比如300美元),则提供免费付运。结果需输出各订单邮寄费用以及总费用。
其中订单表已加载到内存,如下:
Id
cost
weight
Josh1
150
6
Drake
100
3
Megan
100
1
Josh2
200
3
Josh3
500
1
邮费规则每次试算时都不同,因此由参数“pRule”临时传入,格式为json字符串,某次规则如下:
[{"field":"cost","minVal":300,"maxVal":1000000,"Charge":0},
{"field":"weight","minVal":0,"maxVal":1,"Charge":10},
{"field":"weight","minVal":1,"maxVal":5,"Charge":20},
{"field":"weight","minVal":5,"maxVal":10,"Charge":25},
{"field":"weight","minVal":10,"maxVal":1000000,"Charge":40}]
上述json串表示各字段在各种取值范围内时的邮费。第一条记录表示,cost字段取值在300与1000000之间的时候,邮费为0(免费付运);第二条记录表示,weight字段取值在0到1(kg)之间时,邮费为10(美元)。
思路:将json串转为二维表,分别找出filed字段为cost和weight的记录,再对整个订单表进行循环。循环中先判断订单记录中的cost值是否满足免费标准,不满足则根据重量判断邮费档次,之后计算邮费。算完各订单邮费后再计算总邮费,并将汇总结果附加为订单表的最后一条记录。
数据加载过程很简单,这里不再赘述,即:读数据库表,并命名为“订单表”。
业务算法相对复杂,具体如下:

A
B
C
D
1
= pRule.export@j()
/解析json,转二维表


2
=免费=A1.select(field=="cost")
/取免费标准,单条


3
=收费=A1.select(field=="weight").sort(-minVal)
/取收费阶梯,多条


4
=订单表.derive(postage)
/复制并新增字段


5
for A4
if 免费.minVal < A5.cost
>A5. postage= 免费.Charge

6


next

7

for 收费
if A5.weight > B7.minVal
>A5.postage=B7.Charge
8



next A5
9
=A4.record(["sum",,,A4.sum(postage)])



A1:解析json,将其转为二维表。集算器支持多数据源,不仅支持RDB,也支持NOSQL、文件、webService。
A2-A3:查询邮费规则,分为免费和收费两种。
A4:新增空字段postage。
A5-D8:按两种规则循环订单表,计算相应的邮费,并填入postage字段。这里多处用到流程控制,集算器用缩进表示,其中A5、B7为循环语句,C6、D8跳入下一轮循环,B5、C7为判断语句
A9:在订单表追加新纪录,填入汇总值。
计算结果如下:
Id
cost
weight
postage
Josh1
150
6
25
Drake
100
3
20
Megan
100
1
10
Josh2
200
3
20
Josh3
500
1
0
sum


75
  
        至此,本文详细介绍了集算器用作内存计算引擎的完整过程,同时包括了常用计算方法和高级运算技巧。可以看到,集算器具有以下显著优点:
l   结构简单实施方便,可快速实现内存计算;
l   支持多种调用接口,应用集成没有障碍;
l   支持透明优化,可显著提升计算性能;
l   支持多种数据源,便于实现混合计算;
l   语法敏捷精妙,可轻松实现复杂业务逻辑。
        关于内存计算,还有个多机分布式计算的话题,将在后续文章中进行介绍。

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

 
 
大数据行业交流
大数据行业交流
大数据求职招聘
大数据求职招聘
站长电话:
15010106923
微信联系:
hb-0310
站长邮箱:
ab12-120@163.com
大数据中国微信

QQ   

版权所有: Discuz! © 2001-2013 大数据.

GMT+8, 2024-4-20 22:25 , Processed in 0.135682 second(s), 24 queries .

快速回复 返回顶部 返回列表