随着互联网信息技术日新月异的发展一个海量数据爆炸的时代已经到来。如何有效地处理、分析这些海量的数据资源成为各大技术厂商争在激烈的竞争中脱颖而出的一个利器。可以说如果不能很好的快速处理分析这些海量的数据资源将很快被市场无情地所淘汰。当然处理分析这些海量数据目前可以借鉴的方案有很多首先在分布式计算方面有Hadoop里面的MapReduce并行计算框架它主要针对的是离线的数据挖掘分析。此外还有针对实时在线流式数据处理方面的同样也是分布式的计算框架Storm也能很好的满足数据实时性分析、处理的要求。最后还有Spring Batch这个完全面向批处理的框架可以大规模的应用于企业级的海量数据处理。在这里我就不具体展开说明这些框架如何部署、以及如何开发使用的详细教程说明。我想在此基础上更进一步我们能否借鉴这些开源框架背后的技术背景为服务的企业或者公司量身定制一套符合自身数据处理要求的批处理框架。首先我先描述一下目前我所服务的公司所面临的一个用户数据存储处理的一个现状背景。目前移动公司一个省内在网用户数据规模达到几千万的规模数量级而且每个省已经根据地市区域对用户数据进行划分我们把这批数据存储在传统的关系型数据库上面基于Oracle地市是分区。移动公司的计费结算系统会根据用户手机话费的余额情况实时的通知业务处理系统给手机用户进行停机、复机的操作。业务处理系统收到计费结算系统的请求会把要处理的用户数据往具体的交换机网元上派发不同的交换机指令这里简单的可以称为Hlr停复机指令下面开始本文都简称Hlr指令。目前面临的现状是在日常情况下传统的C多进程的后台处理程序还能勉强的“准实时”地处理这些数据请求但是如果一旦到了每个月的月初几天要处理的数据量往往会暴增而C后台程序处理的效率并不高。这时问题来了往往会有用户投诉自己缴费了为什么没有复机或者某些用户明明已经欠费了但是还没有及时停机。这样的结果会直接降低客户对移动运营商支撑的满意度于此同时移动运营商本身也可能流失这些客户资源。自己认真评估了一下造成上述问题的几个瓶颈所在。一个省所有的用户数据都放在数据库的一个实体表中数据库服务器满打满算达到顶级小型机配置也可能无法满足月初处理量激增的性能要求可以说频繁的在一台服务器上读写IO开销非常巨大整个服务器处理的性能低下。处理这些数据的时候会同步地往交换机物理设备上发送Hlr指令在交换机没有处理成功这个请求指令的时候只能阻塞等待进一步造成后续待处理数据的积压。针对上述的问题本人想到了几个优化方案。数据库中的实体表能不能根据用户的归属地市进行表实体的拆分。即把一台或者几台服务器的压力进行水平拆分。一台数据库服务器就重点处理某一个或者几个地市的数据请求降低IO开销。由于交换机处理Hlr指令的时候存在阻塞操作我们能不能改成通过异步返回处理的方式把处理任务队列中的任务先下达通知给交换机然后交换机通过异步回调机制反向通知处理模块汇报任务的执行情况。这样处理模块就从主动的任务轮询等待变成等待交换机执行结果的异步通知这样它就可以专注地进行处理数据的派发不会受到某几个任务处理时长的限制从而影响到后面整批次的数据处理。数据库的实体表由于进行水平拆解能不能做到并行加载这样就会大大节约串行数据加载的处理时长。并行加载出来的待处理数据最好能放到一个批处理框架里面批处理框架能很好地根据要处理数据的情况进行配置参数调整从而很好地满足实时性的要求。比如月初期间可以加大处理参数的值提高处理效率。平常的时候可以适当降低处理参数的取值降低系统的CPU/IO开销。基于以上几点考虑得出如下图所示的设计方案的组件图下面就具体说明一下其中关键模块如何协同工作的。异步并行查询加载模块BatchQueryLoader支持传入多个数据源对象同时利用google-guava库中对于Future接口的扩展ListenableFuture来实现批量查询数据的并行加载。Future接口主要是用来表示异步计算的结果并且计算完成的时候只能用get()方法获取结果get方法里面其中有一个方法是可以设置超时时间的。在并行加载模块里面批量并行地加载多个数据源里面的实体表中的数据并最终反馈加载的结果集合。并行数据加载和串行数据加载所用的耗时可以简单用下面的图例来说明串行加载的总耗时是每个数据源加载耗时的总和。而并行加载的总耗时取决于最大加载的那个数据源耗时时长。注我们把每天要进行停复机处理的用户数据通过采集程序分地市分布采集到水平分库的notify_users提醒用户表并行异步批处理模块BatchTaskReactor内部是通过线程池机制来实现的接受异步并行查询加载模块BatchQueryLoader得到的加载结果数据放入线程池中进行任务的异步派发它最终就是通过Hlr派单指令异步任务执行HlrBusinessEventTask模块下发指令任务然后自己不断的从阻塞队列中获取待执行的任务列表进行任务的分派。与此同时他通过Future接口异步得到HlrBusinessEventTask派发指令的执行反馈结果。批量处理线程池运行参数配置加载BatchTaskConfigurationLoader加载线程池运行参数的配置把结果通知并行异步批处理模块BatchTaskReactor配置文件batchtask-configuration.xml的内容如下所示。?xml version1.0 encodingGBK? batchtask !-- 批处理异步线程池参数配置 -- jobpool namenewlandframework_batchtask attribute namecorePoolSize value15 / attribute namemaxPoolSize value30 / attribute namekeepAliveTime value1000 / attribute nameworkQueueSize value200 / /jobpool /batchtask其中corePoolSize表示保留的线程池大小workQueueSize表示的是阻塞队列的大小maxPoolSize表示的是线程池的最大大小keepAliveTime指的是空闲线程结束的超时时间。其中创建线程池方法ThreadPoolExecutor里面有个参数是unit它表示一个枚举即keepAliveTime的单位。说了半天这几个参数到底什么关系呢我举一个例子说明一下当出现需要处理的任务的时候ThreadPoolExecutor会分配corePoolSize数量的线程池去处理如果不够的话会把任务放入阻塞队列阻塞队列的大小是workQueueSize当然这个时候还可能不够怎么办。只能叫来“临时工线程”帮忙处理一下这个时候“临时工线程”的数量是maxPoolSize-corePoolSize当然还会继续不够这个时候ThreadPoolExecutor线程池会采取4种处理策略。现在具体说一下是那些处理策略。首先是ThreadPoolExecutor.AbortPolicy 中处理程序遭到拒绝将抛出运行时 RejectedExecutionException。然后是ThreadPoolExecutor.CallerRunsPolicy 中线程调用运行该任务的 execute 本身。此策略提供简单的反馈控制机制能够减缓新任务的提交速度。其次是ThreadPoolExecutor.DiscardPolicy 中不能执行的任务将被删除。最后是ThreadPoolExecutor.DiscardOldestPolicy 中如果执行程序尚未关闭则位于工作队列头部的任务将被删除然后重试执行程序如果再次失败则重复此过程。如果要处理的任务没有那么多了ThreadPoolExecutor线程池会根据keepAliveTime设置的时间单位来回收多余的“临时工线程”。你可以把keepAliveTime理解成专门是为maxPoolSize-corePoolSize的“临时工线程”专用的。线程池参数的设定。正常情况下我们要如何设置线程池的参数呢我们应该这样设置I、workQueueSize阻塞队列的大小至少大于等于corePoolSize的大小。II、maxPoolSize线程池的大小至少大于等于corePoolSize的大小。III、corePoolSize是你期望处理的默认线程数个人觉得线程池机制的话至少大于1吧不然的话你这个线程池等于单线程处理任务了这样就失去了线程池设计存在的意义了。JMXJava Management Extensions批处理任务监控模块BatchTaskMonitor实时地监控线程池BatchTaskReactor中任务的执行处理情况具体就是任务成功/失败情况。介绍完毕了几个核心模块主要的功能那下面就依次介绍一下主要模块的详细设计思路。我们把每天要进行停复机处理的用户数据通过采集程序采集到notify_users表。首先定义的是我们要处理采集的通知用户数据对象的结构描述它对应水平分库的表notify_users的JavaBean对象。notify_users的表结构为了演示起见简单设计如下基于Oracle数据库create table notify_users(home_city number(3) /*手机用户的归属地市编码*/,msisdn number(15) /*手机号码*/,user_id number(15) /*手机用户的用户标识*/);对应JavaBean实体类NotifyUsers具体代码定义如下/** * filename:NotifyUsers.java * * Newland Co. Ltd. All rights reserved. * * Description:要进行批处理通知的用户对象 * author tangjie * version 1.0 * */ package newlandframework.batchtask.model; import org.apache.commons.lang.builder.ToStringBuilder; import org.apache.commons.lang.builder.ToStringStyle; public class NotifyUsers { public NotifyUsers() { } // 用户归属地市编码(这里具体是:591表示福州/592表示厦门) private Integer homeCity; // 用户的手机号码 private Integer msisdn; // 用户标识 private Integer userId; public Integer getHomeCity() { return homeCity; } public void setHomeCity(Integer homeCity) { this.homeCity homeCity; } public Integer getMsisdn() { return msisdn; } public void setMsisdn(Integer msisdn) { this.msisdn msisdn; } public Integer getUserId() { return userId; } public void setUserId(Integer userId) { this.userId userId; } public String toString() { return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) .append(homeCity, homeCity).append(userId, userId) .append(msisdn, msisdn).toString(); } }异步并行查询加载模块BatchQueryLoader的类图结构我们通过并行查询加载模块BatchQueryLoader调用异步并行查询执行器BatchQueryExecutor来并行地加载不同数据源的查询结果集合。StatementWrapper则是对JDBC里面Statement的封装。具体代码如下所示/** * filename:StatementWrapper.java * * Newland Co. Ltd. All rights reserved. * * Description:Statement封装类 * author tangjie * version 1.0 * */ package newlandframework.batchtask.parallel; import java.sql.Connection; import java.sql.Statement; public class StatementWrapper { private final String sql; private final Statement statement; private final Connection con; public StatementWrapper(String sql, Statement statement, Connection con) { this.sql sql; this.statement statement; this.con con; } public String getSql() { return sql; } public Statement getStatement() { return statement; } public Connection getCon() { return con; } }定义两个并行加载的异常类BatchQueryInterruptedException、BatchQueryExecutionException/** * filename:BatchQueryInterruptedException.java * * Newland Co. Ltd. All rights reserved. * * Description:并行查询加载InterruptedException异常类 * author tangjie * version 1.0 * */ package newlandframework.batchtask.parallel; public class BatchQueryInterruptedException extends RuntimeException { public BatchQueryInterruptedException(final String errorMessage, final Object... args) { super(String.format(errorMessage, args)); } public BatchQueryInterruptedException(final Exception cause) { super(cause); } }/** * filename:BatchQueryExecutionException.java * * Newland Co. Ltd. All rights reserved. * * Description:并行查询加载ExecutionException异常类 * author tangjie * version 1.0 * */ package newlandframework.batchtask.parallel; public class BatchQueryExecutionException extends RuntimeException { public BatchQueryExecutionException(final String errorMessage, final Object... args) { super(String.format(errorMessage, args)); } public BatchQueryExecutionException(final Exception cause) { super(cause); } }再抽象出一个批量查询接口主要是为了后续能扩展在不同的数据库之间进行批量加载。接口类BatchQuery定义如下/** * filename:BatchQuery.java * * Newland Co. Ltd. All rights reserved. * * Description:异步查询接口定义 * author tangjie * version 1.0 * */ package newlandframework.batchtask.parallel; public interface BatchQueryIN, OUT { OUT query(IN input) throws Exception; }好了现在封装一个异步并行查询执行器BatchQueryExecutor/** * filename:BatchQueryExecutor.java * * Newland Co. Ltd. All rights reserved. * * Description:异步并行查询执行器 * author tangjie * version 1.0 * */ package newlandframework.batchtask.parallel; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.MoreExecutors; import org.apache.commons.collections.Closure; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.functors.ForClosure; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Set; public class BatchQueryExecutor { private final static int FUTUREQUERYNUMBER 1; public BatchQueryExecutor() { } public IN, OUT ListOUT executeQuery(final CollectionIN inputs,final BatchQueryIN, OUT executeUnit) { ListenableFutureListOUT futures submitBatchTaskFutures(inputs,executeUnit); delegateAsynTask(futures); return getAsynResults(futures); } private IN, OUT ListenableFutureListOUT submitBatchTaskFutures( final CollectionIN inputs, final BatchQueryIN, OUT executeUnit) { final SetListenableFutureOUT result new HashSetListenableFutureOUT( inputs.size()); final ListeningExecutorService service MoreExecutors .listeningDecorator(Executors.newFixedThreadPool(inputs.size())); Closure futureQuery new Closure() { public void execute(Object input) { final IN p (IN) input; result.add(service.submit(new CallableOUT() { Override public OUT call() throws Exception { return executeUnit.query(p); } })); } }; Closure parallelTask new ForClosure(FUTUREQUERYNUMBER, futureQuery); CollectionUtils.forAllDo(inputs, parallelTask); service.shutdown(); return Futures.allAsList(result); } private OUT OUT getAsynResults(final ListenableFutureOUT futures) { try { return futures.get(); } catch (InterruptedException ex) { throw new BatchQueryInterruptedException(ex); } catch (ExecutionException ex) { throw new BatchQueryExecutionException(ex); } } private TYPE void delegateAsynTask( final ListenableFutureTYPE allFutures) { Futures.addCallback(allFutures, new FutureCallbackTYPE() { Override public void onSuccess(final TYPE result) { System.out.println(并行加载查询执行成功); } Override public void onFailure(final Throwable thrown) { System.out.println(并行加载查询执行失败); } }); } }