独立任务模式
在这种情况下,通过为每个项目创建单独的线程来实现并行性。每个线程只包含一个项目。线程与项目之间为1:1映射关系。这将消除对项目进行预分配的时间消耗。由于每个项目由一个线程独立处理,负载分布将更加一致。此方法解决了将大量项目捆绑为一捆时,其它提前完成任务的线程空闲,等待其它线程完成的问题。
然而,如果项目数量巨大,势必需要创建大量的批处理线程。创建大量批处理线程的过程将消耗大量的时间,因为创建批处理线程时需要每次检查条件、依赖性、约束,并且需要从就绪状态执行一组新线程。
使用该方案过账一组销售订单发票的示例代码:
注意:这里使用的代码只是一个例子。不要将其用于您的销售订单过帐需求。 AX 2012默认销售订单过帐功能使用更复杂和功能更丰富的方式处理此并行性。
DemoBatchIndividualTasks:
public class DemoBatchIndividualTasks extends RunBaseBatch { str 20 SalesOrder; #define.CurrentVersion(1) #localmacro.CurrentList SalesOrder #endmacro } public void new() { super(); } public container pack() { return [#CurrentVersion, #CurrentList]; } private void parmSalesOrder(str _SalesOrder) { SalesOrder= _SalesOrder; } //Single Work item per thread void run() { SalesTable salesTable; SalesFormLetter formletter; Map SalesMap; select * from salesTable where salesTable.salesId == SalesOrder && salesTable.documentStatus == DocumentStatus::none; if (salesTable) { formletter = SalesFormLetter::construct(DocumentStatus::Invoice); formletter.getLast(); formletter.resetParmListCommonCS(); formletter.allowEmptyTable(formletter.initAllowEmptyTable(true)); SalesMap = new Map(Types::Int64,Types::Record); SalesMap.insert(salesTable.recid,salesTable); formletter.parmDataSourceRecordsPacked(SalesMap.pack()); formletter.createParmUpdateFromParmUpdateRecord(SalesFormletterParmData::initSalesParmUpdateFormletter(DocumentStatus::Invoice, FormLetter.pack())); formletter.showQueryForm(false); formletter.initLinesQuery(); formletter.update(salesTable, systemDateGet(), SalesUpdate::All, AccountOrder::None, false, false); } } public boolean unpack(container packedClass) { Version version = RunBase::getVersion(packedClass); switch(version) { case #CurrentVersion: [version,#CurrentList] = packedClass; break; default: return false; } return true; } public static DemoBatchIndividualTasks construct(str _SalesOrder) { DemoBatchIndividualTasks c; c = new DemoBatchIndividualTasks(); c.parmSalesOrder(_SalesOrder); return c; }
Job to Schedule the above batch:
static void scheduleDemoIndividualTasksJob(Args _args) { BatchHeader batchHeader; DemoBatchIndividualTasks demoBatchIndividualTasks; BatchInfo batchInfo; SalesTable salesTable; ttsBegin; select count(RecId) from salesTable where salesTable.salesId >= ‘SO-00400001’&& salesTable.salesId <= ‘SO-00500000’ && salesTable.documentStatus == DocumentStatus::none; if (salesTable.recid > 0) { batchHeader = BatchHeader::construct(); batchHeader.parmCaption(strFmt(‘Batch job for demoBatchIndividualTasks -Invoice SalesOrders %1 thru %2’, ‘SO-00400001’, ‘SO-00500000’)); while select * from salesTable where salesTable.salesId >= ‘SO-00400001’&& salesTable.salesId <= ‘SO-00500000’ && salesTable.documentStatus == DocumentStatus::none { /* Each task is created to process a single work item (in this case a single sales Order)*/ demoBatchIndividualTasks = DemoBatchIndividualTasks::construct( salesTable.salesid); batchInfo = demoBatchIndividualTasks.batchInfo(); BatchInfo.parmCaption(‘Invoicing : ‘+salesTable.salesid); batchHeader.addTask(demoBatchIndividualTasks); } batchHeader.save(); } ttsCommit; info(‘Done’); }
假设,我们处理10万条记录
#创建任务 | #批处理线程(在我的测试服务器) | #可并行执行的并行任务 |
100,000 | 10 | 10 |
一旦前10个任务完成,批处理框架将加载后10个任务并执行它们,在这种情况下,它可以加载10,000或更多次。