java多线程批量拆分List导入数据库的实现过程

网友投稿 319 2022-11-27

java多线程批量拆分List导入数据库的实现过程

目录一、前言二、直接把list怼进mysql三、分组把list导入Mysql中四、多线程分批导入Mysql五、小结

一、前言

前两天做了一个导入的功能,导入开始的时候非常慢,导入2w条数据要1分多钟,后来一点一点的优化,从直接把list怼进Mysql中,到分配把list导入Mysql中,到多线程把list导入Mysql中。时间是一点一点的变少了。非常的爽,最后变成了10s以内。下面就展示一下过程。

二、直接把list怼进Mysql

使用mybatis的批量导入操作:

@Transactional(rollbackFor = Exception.class)

public int addFreshStudentsNew2(List list, String schoolNo) {

if (list == null || list.isEmpty()) {

return 0;

}

List studentEntityList = new LinkedList<>();

List enrollStudentEntityList = new LinkedList<>();

List allusersEntityList = new LinkedList<>();

for (FreshStudentAndStudentModel freshStudentAndStudentModel : list) {

EnrollStudentEntity enrollStudentEntity = new EnrollStudentEntity();

StudentEntity studentEntity = new StudentEntity();

BeanUtils.copyProperties(freshStudentAndStudentModel, studentEntity);

BeanUtils.copyProperties(freshStudentAndStudentModel, enrollStudentEntity);

String operator = TenancyContext.UserID.get();

String studentId = BaseUuidUtils.base58Uuid();

enrollStudentEntity.setId(BaseUuidUtils.base58Uuid());

enrollStudentEntity.setStudentId(studentId);

enrollStudentEntity.setIdentityCardId(freshStudentAndStudentModel.getIdCard());

enrollStudentEntity.setOperator(operator);

studentEntity.setId(studentId);

studentEntity.setIdentityCardId(freshStudentAndStudentModel.getIdCard());

studentEntity.setOperator(operator);

studentEntityList.add(studentEntity);

enrollStudentEntityList.add(enrollStudentEntity);

AllusersEntity allusersEntity = new AllusersEntity();

allusersEntity.setId(enrollStudentEntity.getId());

allusersEntity.setUserCode(enrollStudentEntity.getNemtCode());

allusersEntity.setUserName(enrollStudentEntity.getName());

allusersEntity.setSchoolNo(schoolNo);

allusersEntity.setTelNum(enrollStudentEntity.getTelNum());

allusersEntity.setPassword(enrollStudentEntity.getNemtCode()); //密码设置为考生号

allusersEntityList.add(allusersEntity);

}

enResult = enrollStudentDao.insertAll(enrollStudentEntityList);

stuResult = studentDao.insertAll(studentEntityList);

allResult = allusersFacade.insertUserList(allusersEntityList);

if (enResult > 0 && stuResult > 0 && allResult) {

return 10;

}

return -10;

}

Mapper.xml

insert into tb_enroll_student

id,

remark,

nEMT_aspiration,

nEMT_code,

nEMT_score,

student_id,

identity_card_id,

level,

major,

name,

nation,

secondary_college,

operator,

sex,

is_delete,

account_address,

native_place,

original_place,

used_name,

pictrue,

join_party_date,

political_status,

tel_num,

is_registry,

graduate_school,

create_time,

update_time

values

(

#{item.id,jdbcType=VARCHAR},

#{item.remark,jdbcType=VARCHAR},

#{item.nemtAspiration,jdbcType=VARCHAR},

#{item.nemtCode,jdbcType=VARCHAR},

#{item.nemtScore,jdbcType=VARCHAR},

#{item.studentId,jdbcType=VARCHAR},

#{item.identityCardId,jdbcType=VARCHAR},

#{item.level,jdbcType=VARCHAR},

#{item.major,jdbcType=VARCHAR},

#{item.name,jdbcType=VARCHAR},

#{item.nation,jdbcType=VARCHAR},

#{item.secondaryCollege,jdbcType=VARCHAR},

#{item.operator,jdbcType=VARCHAR},

#{item.sex,jdbcType=VARCHAR},

0,

#{item.accountAddress,jdbcType=VARCHAR},

#{item.nativePlace,jdbcType=VARCHAR},

#{item.originalPlace,jdbcType=VARCHAR},

#{item.usedName,jdbcType=VARCHAR},

#{item.pictrue,jdbcType=VARCHAR},

#{item.joinPartyDate,jdbcType=VARCHAR},

#{item.politicalStatus,jdbcType=VARCHAR},

#{item.telNum,jdbcType=VARCHAR},

#{item.isRegistry,jdbcType=TINYINT},

#{item.graduateSchool,jdbcType=VARCHAR},

now(),

now()

)

代码说明:

底层的mapper是通过逆向工程来生成的,批量插入如下,是拼接成类似: insert into tb_enroll_student()values (),()…….() ;

这样的缺点是,数据库一般有一个默认的设置,就是每次sql操作的数据不能超过4M。这样插入,数据多的时候,数据库会报错Packet for query is too large (6071393 > 4194304). You can change this value on the server by setting the max_allowed_packet' variable.,虽然我们可以通过

类似 修改 my.ini 加上 max_allowed_packet =67108864

67108864=64M

默认大小4194304 也就是4M

修改完成之后要重启mysql服务,如果通过命令行修改就不用重启mysql服务。

完成本次操作,但是我们不能保证项目单次最大的大小是多少,这样是有弊端的。所以可以考虑进行分组导入。

三、分组把list导入Mysql中

同样适用mybatis批量插入,区别是对每次的导入进行分组计算,然后分多次进行导入:

@Transactional(rollbackFor = Exception.class)

public int addFreshStudentsNew2(List list, String schoolNo) {

if (list == null || list.isEmpty()) {

return 0;

}

List studentEntityList = new LinkedList<>();

List enrollStudentEntityList = new LinkedList<>();

List allusersEntityList = new LinkedList<>();

for (FreshStudentAndStudentModel freshStudentAndStudentModel : list) {

EnrollStudentEntity enrollStudentEntity = new EnrollStudentEntity();

StudentEntity studentEntity = new StudentEntity();

BeanUtils.copyProperties(freshStudentAndStudentModel, studentEntity);

BeanUtils.copyProperties(freshStudentAndStudentModel, enrollStudentEntity);

String operator = TenancyContext.UserID.get();

String studentId = BaseUuidUtils.base58Uuid();

enrollStudentEntity.setId(BaseUuidUtils.base58Uuid());

enrollStudentEntity.setStudentId(studentId);

enrollStudentEntity.setIdentityCardId(freshStudentAndStudentModel.getIdCard());

enrollStudentEntity.setOperator(operator);

studentEntity.setId(studentId);

studentEntity.setIdentityCardId(freshStudentAndStudentModel.getIdCard());

studentEntity.setOperator(operator);

studentEntityList.add(studentEntity);

enrollStudentEntityList.add(enrollStudentEntity);

AllusersEntity allusersEntity = new AllusersEntity();

allusersEntity.setId(enrollStudentEntity.getId());

allusersEntity.setUserCode(enrollStudentEntity.getNemtCode());

allusersEntity.setUserName(enrollStudentEntity.getName());

allusersEntity.setSchoolNo(schoolNo);

allusersEntity.setTelNum(enrollStudentEntity.getTelNum());

allusersEntity.setPassword(enrollStudentEntity.getNemtCode()); //密码设置为考生号

allusersEntityList.add(allusersEntity);

}

int c = 100;

int b = enrollStudentEntityList.size() / c;

int d = enrollStudentEntityList.size() % c;

int enResult = 0;

int stuResult = 0;

boolean allResult = false;

for (int e = c; e <= c * b; e = e + c) {

enResult = enrollStudentDao.insertAll(enrollStudentEntityList.subList(e - c, e));

stuResult = studentDao.insertAll(studentEntityList.subList(e - c, e));

allResult = allusersFacade.insertUserList(allusersEntityList.subList(e - c, e));

}

if (d != 0) {

enResult = enrollStudentDao.insertAll(enrollStudentEntityList.subList(c * b, enrollStudentEntityList.size()));

stuResult = studentDao.insertAll(studentEntityList.subList(c * b, studentEntityList.size()));

allResult = allusersFacade.insertUserList(allusersEntityList.subList(c * b, allusersEntityList.size()));

}

if (enResult > 0 && stuResult > 0 && allResult) {

return 10;

}

return -10;

}

代码说明:

这样操作,可以避免上面的错误,但是分多次插入,无形中就增加了操作实践,很容易超时。所以这种方法还是不值得提倡的。

再次改进,使用多线程分批导入。

四、多线程分批导入Mysql

依然使用mybatis的批量导入,不同的是,根据线程数目进行分组,然后再建立多线程池,进行导入。

@Transactional(rollbackFor = Exception.class)

public int addFreshStudentsNew(List list, String schoolNo) {

if (list == null || list.isEmpty()) {

return 0;

}

List studentEntityList = new LinkedList<>();

List enrollStudentEntityList = new LinkedList<>();

List allusersEntityList = new LinkedList<>();

list.forEach(freshStudentAndStudentModel -> {

EnrollStudentEntity enrollStudentEntity = new EnrollStudentEntity();

StudentEntity studentEntity = new StudentEntity();

BeanUtils.copyProperties(freshStudentAndStudentModel, studentEntity);

BeanUtils.copyProperties(freshStudentAndStudentModel, enrollStudentEntity);

String operator = TenancyContext.UserID.get();

String studentId = BaseUuidUtils.base58Uuid();

enrollStudentEntity.setId(BaseUuidUtils.base58Uuid());

enrollStudentEntity.setStudentId(studentId);

enrollStudentEntity.setIdentityCardId(freshStudentAndStudentModel.getIdCard());

enrollStudentEntity.setOperator(operator);

studentEntity.setId(studentId);

studentEntity.setIdentityCardId(freshStudentAndStudentModel.getIdCard());

studentEntity.setOperator(operator);

studentEntityList.add(studentEntity);

enrollStudentEntityList.add(enrollStudentEntity);

AllusersEntity allusersEntity = new AllusersEntity();

allusersEntity.setId(enrollStudentEntity.getId());

allusersEntity.setUserCode(enrollStudentEntity.getNemtCode());

allusersEntity.setUserName(enrollStudentEntity.getName());

allusersEntity.setSchoolNo(schoolNo);

allusersEntity.setTelNum(enrollStudentEntity.getTelNum());

allusersEntity.setPassword(enrollStudentEntity.getNemtCode()); //密码设置为考生号

allusersEntityList.add(allusersEntity);

});

int nThreads = 50;

int size = enrollStudentEntityList.size();

ExecutorService executorService = Executors.newFixedThreadPool(nThreads);

List> futures = new ArrayList>(nThreads);

for (int i = 0; i < nThreads; i++) {

final List EnrollStudentEntityImputList = enrollStudentEntityList.subList(size / nThreads * i, size / nThreads * (i + 1));

final List studentEntityImportList = studentEntityList.subList(size / nThreads * i, size / nThreads * (i + 1));

final List allusersEntityImportList = allusersEntityList.subList(size / nThreads * i, size / nThreads * (i + 1));

Callable task1 = () -> {

studentSave.saveStudent(EnrollStudentEntityImputList,studentEntityImportList,allusersEntityImportList);

return 1;

};

futures.add(executorService.submit(task1));

}

executorService.shutdown();

if (!futures.isEmpty() && futures != null) {

return 10;

}

return -10;

}

代码说明:

上面是通过应用ExecutorService 建立了固定的线程数,然后根据线程数目进行分组,批量依次导入。一方面可以缓解数据库的压力,另一个面线程数目多了,一定程度会提高程序运行的时间。缺点就是要看服务器的配置,如果配置好的话就可以开多点线程,配置差的话就开小点。

五、小结

通过使用这个操作真是不断的提高了,项目使用技巧也是不错。加油~~ 多线程哦~~

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:新手入门大数据,这有一条最完整的学习路径
下一篇:大数据领域的六年巨变
相关文章

 发表评论

暂时没有评论,来抢沙发吧~