Dynamics AX 2012 和 AX 2009 都支持将批处理作业分解为小的可管理片段,并行进行处理。并行处理对于提高批处理作业的吞吐量和响应时间,以及提高批处理作业窗口至关重要。有几种不同的方法都可以将一个大的批处理作业拆散为小任务。下面罗列三种常见的方法
1. 批量捆绑。
2. 独立任务模式。
3. 顶级采摘。
每种方法都有自己的优点和缺点。选择恰当的方法,甚至可以将通常需要几个小时完成的任务在几分钟的时间内完成。
这是四篇系列文章的第一篇,在后续的四篇文章中,将逐一说明每种方法。还将比较这三种技术在不同工作负载下的性能。我们采用一个实例,用这三种不同的方法将一个销售订单过帐发票。我将比较每种方法对不同工作负载的性能消耗。
注意:这里使用的代码只是一个例子。不要将其用于您的销售订单过帐需求。 AX 2012默认销售订单过帐功能使用更复杂和功能更丰富的方式处理此并行性。
批量捆绑
在此模型中,创建若干个可以容纳固定项目的捆(bundle)。 然后将需要处理的项目平均分配,放入捆(bundle)中。 每个工作线程处理一捆,一捆处理完毕后,接着处理下一捆(bundle)中的项目。 如果处理每捆(bundle)所花费的时间大致相同,则该模式最佳。 在理想的情况下,每个线程都将满负荷处理相同的工作量。 但是,如果由于数据或服务器硬件存在差异而,这种方法将不是最有效的,你可能需要等待最后几个较大的捆完成任务,而此时其他线程早已完成。
这里是示例代码。
DemoBatchBundles:
public class DemoBatchBundles extends RunBaseBatch { str 20 fromSalesOrder, toSalesOrder; #define.CurrentVersion(1) #localmacro.CurrentList fromSalesOrder, toSalesOrder #endmacro } public void new() { super(); } public container pack() { return [#CurrentVersion, #CurrentList]; } private void parmEndBlock( str _toSalesOrder) { toSalesOrder = _toSalesOrder; } private void parmStartBlock(str _fromSalesOrder) { fromSalesOrder= _fromSalesOrder; } void run() { SalesTable salesTable; SalesFormLetter formletter; Map SalesMap; info(fromSalesOrder+':'+toSalesOrder); /* Each task knows the range of work items it needs to process. This range information is already packed when the task is created */ while select * from salesTable where salesTable.salesId >= fromSalesOrder && salesTable.salesId <= toSalesOrder && salesTable.documentStatus == DocumentStatus::none { formletter = SalesFormLetter::construct(DocumentStatus::Invoice); formletter.getLast(); formletter.resetParmListCommonCS(); formletter.allowEmptyTable(formletter.initAllowEmptyTable(true)); SalesMap = new Map(Types::Int64,Types::Record); SalesMap.insert(salesTable.recid,salesTable); formletter.parmDataSourceRecordsPacked(SalesMap.pack()); formletter.createParmUpdateFromParmUpdateRecord(SalesFormletterParmData::initSalesParmUpdateFormletter(DocumentStatus::Invoice, FormLetter.pack())); formletter.showQueryForm(false); formletter.initLinesQuery(); formletter.update(salesTable, systemDateGet(), SalesUpdate::All, AccountOrder::None, false, false); } } public boolean unpack(container packedClass) { Version version = RunBase::getVersion(packedClass); switch(version) { case #CurrentVersion: [version,#CurrentList] = packedClass; break; default: return false; } return true; } public static DemoBatchBundles construct(str _fromSalesOrder, str _toSalesOrder) { DemoBatchBundles c; c = new DemoBatchBundles(); c.parmStartBlock(_fromSalesOrder); c.parmEndBlock(_toSalesOrder); return c; }
Job to Schedule the above batch:
/* Here tasks are created to process work items equivalent to the bundle size. The range between the fromSalesOrder and toSalesOrder is a bundle of work items. */ static void scheduleDemoBundlesJob(Args _args) { int blockSize=1000; //My bundle size BatchHeader batchHeader; DemoBatchBundles demoBatchBundles; SalesTable salesTable; str fromSalesOrder, toSalesOrder; str lastSalesId; BatchInfo batchInfo; int Counter=0; ttsBegin; select count(RecId) from salesTable where salesTable.salesId >= 'SO-00400001' && salesTable.salesId <= 'SO-00500000' && salesTable.documentStatus == DocumentStatus::none; if (salesTable.recid > 0) { batchHeader = BatchHeader::construct(); batchHeader.parmCaption(strFmt('Batch job for DemoBundlesBatch Invoice SalesOrders %1 thru %2', 'SO-00400001', 'SO-00500000')); while select salesid from salesTable order by salesTable.SalesId where salesTable.salesId >= 'SO-00400001' && salesTable.salesId <= 'SO-00500000' && salesTable.documentStatus == DocumentStatus::none { Counter += 1; if (Counter ==1) { fromSalesOrder = salesTable.salesid; } if (Counter == blockSize) { toSalesOrder = salesTable.salesid; /* Each task is created to process a bundle of work items (in this case a range of sales Orders)*/ demoBatchBundles = DemoBatchBundles::construct(fromSalesOrder, toSalesOrder); info(fromSalesOrder+' : ' + toSalesOrder); batchInfo = DemoBatchBundles.batchInfo(); BatchInfo.parmCaption(fromSalesOrder+' : ' + toSalesOrder); batchHeader.addTask(demoBatchBundles); Counter = 0; } lastSalesId = salesTable.SalesId; } // This is to handle the spillover // #SalesOrders in this last bundle will be less than the bundle size if (Counter > 0) { toSalesOrder = lastSalesId; demoBatchBundles = DemoBatchBundles::construct( fromSalesOrder, toSalesOrder); info(fromSalesOrder+' : ' + toSalesOrder); batchInfo = DemoBatchBundles.batchInfo(); BatchInfo.parmCaption(fromSalesOrder+' : ' + toSalesOrder); batchHeader.addTask(demoBatchBundles); } batchHeader.save(); } ttsCommit; info('Done'); }
假设,我们处理10万条记录
#已创建任务 | #批处理线程(在我的测试服务器) | #可并行执行的并行任务 |
100 | 10 | 10 |
前10个任务完成后,批处理框架将加载下10个任务并执行,以此类推; 在这种情况下,可能需要加载10次或更多次。