顶级采摘:
我们在捆绑中看到的问题是工作负载的不均匀分布。 该问题可以用“独立任务模式”来解决。 但是当有大量任务时,各个任务的框架开销又会非常严重,所以必须适当平衡两种模式。“Top Picking”是另一种批处理技术,将解决任务负荷分布不均的问题。 但是,如果项目的数量巨大,也会遇到与“独立任务模式”相同的问题。
在这种方法中,您将创建一个静态数量的任务(就像在Bundling中一样)。 但不进行预分配(就像在“独立任务模式”中一样)。 由于没有进行预分配,并且我们不依赖于Batch框架来分隔项目,因此 需要维护一个包含所有项目的临时表。 维护此临时表以跟踪项目的进度。维护临时表会有一些额外的开销,但比批处理框架的开销小得多。 临时表数据初始化后,线程可以通过从临时表中获取下一个可用项目开始处理,直到所有项目完成为止。 这意味着将不存在空闲的线程,而另一些线程满负荷的情况。 为了实现这一点,我们将使用PessimistiClock和ReadPast Hint关键字。 这些关键字确保线程在不死锁的情况下获取下一个可用的项目。
整个过程的伪代码如下所示:
- 使用项目填充暂存表.
- 可以使用 InsertRecordSet 或 RecordInsertList 提高插入效率。
- 创建包含“N”个预保留任务的批处理作业。
- 每个线程内
- 使用 悲观锁 获取下一条可用的项目(用 READPAST)。
- 项目可用后,对剩余交易保持加锁状态。
- 当前项目处理完毕后,将项目状态进行更新,循环获取下一条可用项目。
我们仍然使用相同的过账销售订单发票的示例进行说明:
注意:这里使用的代码只是一个例子。不要将其用于您的销售订单过帐需求。 AX 2012默认销售订单过帐功能使用更复杂和功能更丰富的方式处理此并行性。
DemoBatchTopPicking
Staging Table: demoTopPickProcessTrackTable
Field Name | Field Type |
SalesId | SalesIdBase |
ProcessedStatus | ENum:NoYes |
public class DemoBatchTopPicking extends RunBaseBatch { } public void new() { super(); } void run() { SalesTable salesTable; SalesFormLetter formletter; DemoTopPickProcessTrackTable demoTopPickProcessTrackTable; Map SalesMap; DemoTopPickProcessTrackTable.readPast(true); do { ttsBegin; // when it finds no more work item to process do-while loop will exit select pessimisticlock firstOnly * from demoTopPickProcessTrackTable where demoTopPickProcessTrackTable.ProcessedStatus == NoYes::No; select * from salesTable where salesTable.salesId == demoTopPickProcessTrackTable.SalesID &&; salesTable.documentStatus == DocumentStatus::none; if (salesTable) { formletter = SalesFormLetter::construct(DocumentStatus::Invoice); formletter.getLast(); formletter.resetParmListCommonCS(); formletter.allowEmptyTable(formletter.initAllowEmptyTable(true)); SalesMap = new Map(Types::Int64,Types::Record); SalesMap.insert(salesTable.recid,salesTable); formletter.parmDataSourceRecordsPacked(SalesMap.pack()); formletter.createParmUpdateFromParmUpdateRecord(SalesFormletterParmData::initSalesParmUpdateFormletter(DocumentStatus::Invoice, FormLetter.pack())); formletter.showQueryForm(false); formletter.initLinesQuery(); formletter.update(salesTable, systemDateGet(), SalesUpdate::All, AccountOrder::None, false, false); } if(demoTopPickProcessTrackTable) { demoTopPickProcessTrackTable.ProcessedStatus = NoYes::Yes; demoTopPickProcessTrackTable.update(); } ttsCommit; } while ( demoTopPickProcessTrackTable); } public static DemoBatchTopPicking construct() { DemoBatchTopPicking c; c = new DemoBatchTopPicking(); return c; }
Job to Schedule the above batch:
static void scheduleDemoBatchTopPickingJob(Args _args) { BatchHeader batchHeader; DemoBatchTopPicking demoBatchTopPicking; DemoTopPickProcessTrackTable demoTopPickProcessTrackTable; SalesTable salesTable; int totalNumberOfTasksNeeded = 10; int counter; ttsBegin; select count(RecId) from salesTable where salesTable.salesId == ‘SO-00400001’ && salesTable.salesId == ‘SO-00500000’ && salesTable.documentStatus == DocumentStatus::none; if (salesTable.recid &amp;amp;amp;amp;gt; 0) { //Populating the staging table with the work items insert_recordset demoTopPickProcessTrackTable (SalesId) select SalesId from salesTable where salesTable.SalesId == ‘SO-00500000’ && salesTable.SalesId == ‘SO-00500000’ && salesTable.DocumentStatus == DocumentStatus::None; update_recordSet demoTopPickProcessTrackTable setting processedStatus = NoYes::No; batchHeader = BatchHeader::construct(); batchHeader.parmCaption(strFmt(‘Batch job for demoBatchTopPicking -Invoice SalesOrders %1 thru %2’, ‘SO-00400001’, ‘SO-00500000’)); //Creating predefined number of tasks for(counter = 1; counter <= totalNumberOfTasksNeeded; counter++) { demoBatchTopPicking = DemoBatchTopPicking::construct(); batchHeader.addTask(demoBatchTopPicking); } batchHeader.save(); } ttsCommit; info(‘Done’); }
假设,我们处理10万条记录
#创建的任务 | #批处理线程(在我的测试服务器) | #可并行执行的并行任务 |
10 | 10 | 10 |
批处理整个时段,10个任务同时运行,直到队列中没有更多的项目(暂存表中没有要处理的项目)