第一句子网 - 唯美句子、句子迷、好句子大全
第一句子网 > JAVA多线程执行 等待返回结果 再执行

JAVA多线程执行 等待返回结果 再执行

时间:2024-08-02 11:57:28

相关推荐

JAVA多线程执行 等待返回结果 再执行

JAVA多线程执行,等待返回结果,再执行

1.实现callable接口

1)配置线程池

package com.neusoft.demo.server.config;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.core.task.TaskExecutor;import org.springframework.scheduling.annotation.EnableAsync;import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import java.util.concurrent.ThreadPoolExecutor;/*** 线程池配置、启用异步* * @author**/@EnableAsync(proxyTargetClass = true)@Configurationpublic class AsycTaskExecutorConfig {@Beanpublic TaskExecutor taskExecutor() {ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();// 核心线程数taskExecutor.setCorePoolSize(50);//最大线程数taskExecutor.setMaxPoolSize(100);//最大队列数taskExecutor.setQueueCapacity(1000);// 线程的空闲时间taskExecutor.setKeepAliveSeconds(100);//线程前缀//taskExecutor.setThreadNamePrefix("asyncTaskExecutor-");//拒绝策略交给主线程执行taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());return taskExecutor;}}

2)编写线程实现callable接口

package com.neusoft.demo.server.thread;import com.alibaba.fastjson.JSONArray;import com.alibaba.fastjson.JSONObject;import com.neusoft.demo.server.model.TrafficFlow;import mons.collections4.CollectionUtils;import mons.lang3.StringUtils;import org.elasticsearch.action.search.SearchRequest;import org.elasticsearch.action.search.SearchResponse;import org.elasticsearch.client.RequestOptions;import org.elasticsearch.client.RestHighLevelClient;import mon.unit.TimeValue;import org.elasticsearch.index.query.BoolQueryBuilder;import org.elasticsearch.index.query.QueryBuilders;import org.elasticsearch.index.query.RangeQueryBuilder;import org.elasticsearch.index.query.TermQueryBuilder;import org.elasticsearch.index.query.TermsQueryBuilder;import org.elasticsearch.search.aggregations.AggregationBuilders;import org.elasticsearch.search.aggregations.BucketOrder;import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;import org.elasticsearch.search.builder.SearchSourceBuilder;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Configuration;import org.springframework.context.annotation.Scope;import java.io.IOException;import java.util.List;import java.util.concurrent.Callable;/*** @author dume* @create -10-26 9:44**/@Scope("prototype")@Configurationpublic class ThreadTaskProcess implements Callable {private static final Logger log = LoggerFactory.getLogger("adminLogger");@Qualifier("MyHighLevelClient")@Autowiredprivate RestHighLevelClient rhlClient;@Value("${elasticsearch.motor-vehicle-node-name}")private String motorVehicleNode;private int searchnum;private String EndTime;private String BeginTime;private String Precision;private String DeviceCityCode;private String DeviceID;private String PlateColor;private String Direction;public ThreadTaskProcess getInstance(String EndTime, String BeginTime, String Precision, String DeviceCityCode, String DeviceID, String PlateColor, String Direction,int searchnum) {this.EndTime = EndTime;this.BeginTime = BeginTime;this.Precision = Precision;this.DeviceCityCode = DeviceCityCode;this.DeviceID = DeviceID;this.PlateColor = PlateColor;this.Direction = Direction;this.searchnum = searchnum;return this;}@Overridepublic Object call() throws Exception {//es请求SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();BoolQueryBuilder bqb = QueryBuilders.boolQuery();if (BeginTime != null && EndTime != null) {RangeQueryBuilder rqb = QueryBuilders.rangeQuery("PassTime");rqb.gte(BeginTime);rqb.lt(EndTime);rqb.format("yyyyMMddHHmmss");bqb.must(rqb);}if(StringUtils.isNotBlank(DeviceCityCode)){if(DeviceCityCode.contains(",")){String[] strings = DeviceCityCode.split(",");TermsQueryBuilder tqb = QueryBuilders.termsQuery("DeviceCityCode",strings);bqb.must(tqb);}else{TermQueryBuilder tqb = QueryBuilders.termQuery("DeviceCityCode",DeviceCityCode);bqb.must(tqb);}}if(StringUtils.isNotBlank(DeviceID)){TermQueryBuilder tqb = QueryBuilders.termQuery("DeviceID",DeviceID);bqb.must(tqb);}if(StringUtils.isNotBlank(PlateColor)){if(PlateColor.length()==1){TermQueryBuilder tqb = QueryBuilders.termQuery("PlateColor",PlateColor);bqb.must(tqb);}else{TermQueryBuilder tqb = QueryBuilders.termQuery("PlateColor","6");bqb.mustNot(tqb);}}if(StringUtils.isNotBlank(Direction)){TermQueryBuilder tqb = QueryBuilders.termQuery("Direction",Direction);bqb.must(tqb);}sourceBuilder.query(bqb);sourceBuilder.size(0);sourceBuilder.timeout(new TimeValue(600000));DateHistogramAggregationBuilder fieldBuilder = AggregationBuilders.dateHistogram("articles_over_time").field("PassTime").dateHistogramInterval(new DateHistogramInterval(Precision+"m")).format("yyyy/MM/dd HH:mm").order(BucketOrder.key(true));SearchRequest searchRequest = new SearchRequest(motorVehicleNode).source(sourceBuilder.aggregation(fieldBuilder));SearchResponse searchResponse = null;JSONArray array = new JSONArray();JSONArray arrayback = new JSONArray();try {searchResponse = rhlClient.search(searchRequest, RequestOptions.DEFAULT);Histogram histo = searchResponse.getAggregations().get("articles_over_time");List<Histogram.Bucket> buckets = (List<Histogram.Bucket>)histo.getBuckets();array = JSONArray.parseArray(JSONArray.toJSONString(buckets));if(CollectionUtils.isNotEmpty(array)){for(int i=0;i<array.size();i++){JSONObject object1 = new JSONObject();object1.put("num",String.valueOf(i+1));object1.put("datetime",array.getJSONObject(i).getString("keyAsString"));object1.put("flownumber",array.getJSONObject(i).getString("docCount"));arrayback.add(object1);}}} catch (IOException e1) {e1.printStackTrace();}TrafficFlow trafficFlow = new TrafficFlow();trafficFlow.setSearchnum(searchnum);if(CollectionUtils.isNotEmpty(arrayback)){List<TrafficFlow> list = JSONArray.parseArray(JSONArray.toJSONString(arrayback),TrafficFlow.class);trafficFlow.setTrafficFlows(list);}return trafficFlow;}}

3)编写逻辑,线程执行完成后获取返回值

package com.neusoft.demo.server.service.impl;import com.alibaba.fastjson.JSONArray;import com.alibaba.fastjson.JSONObject;import com.neusoft.demo.server.model.BeginAndEndDate;import com.neusoft.demo.server.model.TrafficFlow;import com.neusoft.demo.server.service.VbdTrafficFlowService;import com.neusoft.demo.server.thread.ThreadTaskProcess;import com.neusoft.demo.server.utils.DateSplitUtils;import com.neusoft.demo.server.utils.RedisUtil;import mons.collections4.CollectionUtils;import org.elasticsearch.client.RestHighLevelClient;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.ObjectFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.beans.factory.annotation.Value;import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import org.springframework.stereotype.Service;import java.text.SimpleDateFormat;import java.util.ArrayList;import parator;import java.util.Date;import java.util.List;import java.util.concurrent.FutureTask;import java.util.stream.Collectors;/*** @author dume* @create -10-20 17:54**/@Servicepublic class VbdTrafficFlowServiceImpl implements VbdTrafficFlowService {private static final Logger log = LoggerFactory.getLogger("adminLogger");@Qualifier("MyHighLevelClient")@Autowiredprivate RestHighLevelClient rhlClient;@Value("${elasticsearch.motor-vehicle-node-name}")private String motorVehicleNode;@Autowiredprotected ThreadPoolTaskExecutor executorService;@Autowiredprivate ObjectFactory<ThreadTaskProcess> processFactory;private static SimpleDateFormat simpleDateFormatOne = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");private static SimpleDateFormat simpleDateFormatTwo = new SimpleDateFormat("yyyyMMddHHmmss");private static int oneDayTimes = 24*60*60*1000;private static int fiveMinutes = 5*60*1000;private static int fifthyMinutes = 15*60*1000;private static int thirtyMinutes = 30*60*1000;private static int sixtyMinutes = 60*60*1000;@Overridepublic JSONObject getTrafficFlow(String EndTime, String BeginTime, String Precision, String DeviceCityCode, String DeviceID, String PlateColor, String Direction) {JSONObject object = new JSONObject();List<TrafficFlow> trafficFlows = new ArrayList();/*** 时间段处理*/int precisionMinutes = getMinutes(Precision);Date start = new Date();Date end = new Date();List<BeginAndEndDate> beginAndEndDateList = new ArrayList<>();//时间分片数try{start = simpleDateFormatTwo.parse(BeginTime);end = simpleDateFormatTwo.parse(EndTime);}catch (Exception e){e.printStackTrace();log.error("时间转化失败",e.getMessage());}if(end.getTime()-start.getTime()<=oneDayTimes){BeginAndEndDate beginAndEndDate = new BeginAndEndDate();beginAndEndDate.setBeginTime(BeginTime);beginAndEndDate.setEndTime(EndTime);beginAndEndDateList.add(beginAndEndDate);}else{//将时间转化为可以被精度整除的时间long startlong = start.getTime();long endlong = end.getTime();int precision = Integer.valueOf(Precision);precision = precision*60*1000;startlong = startlong%precision==0?startlong:startlong-(startlong%precision);endlong = endlong%precision==0?endlong:endlong+(precision-(endlong%precision));List<DateSplitUtils.DateSplit> dateSplits =DateSplitUtils.splitDate(new Date(startlong),new Date(endlong),DateSplitUtils.IntervalType.DAY,1);for (DateSplitUtils.DateSplit dateSplit : dateSplits) {BeginAndEndDate beginAndEndDate = new BeginAndEndDate();beginAndEndDate.setBeginTime(dateSplit.getStartDateTimeStr());beginAndEndDate.setEndTime(dateSplit.getEndDateTimeStr());beginAndEndDateList.add(beginAndEndDate);}}List<FutureTask<Object>> futureTasks = new ArrayList<FutureTask<Object>>();JSONObject data = new JSONObject();for(int i=0;i<beginAndEndDateList.size();i++){futureTasks.add(new FutureTask<>(processFactory.getObject().getInstance(beginAndEndDateList.get(i).getEndTime(),beginAndEndDateList.get(i).getBeginTime(),Precision,DeviceCityCode,DeviceID,PlateColor,Direction,i+1)));}// 加入 线程池for (FutureTask<Object> futureTask : futureTasks) {executorService.submit(futureTask);}// 获取线程返回结果for (int i = 0; i < futureTasks.size(); i++) {try {TrafficFlow trafficFlow = (TrafficFlow) futureTasks.get(i).get();if(null!=trafficFlow){trafficFlows.add(trafficFlow);}} catch (Exception e) {e.printStackTrace();log.error("多线程获取返回结果失败",e.getMessage());}}/*** 排序并去空*/List<TrafficFlow> backTrafficFlows = new ArrayList();if(CollectionUtils.isNotEmpty(trafficFlows)){trafficFlows = trafficFlows.stream().sorted(paring(TrafficFlow::getSearchnum)).collect(Collectors.toList());for(TrafficFlow trafficFlow : trafficFlows){if(CollectionUtils.isNotEmpty(trafficFlow.getTrafficFlows())){backTrafficFlows.addAll(trafficFlow.getTrafficFlows());}}int num =1;for(TrafficFlow trafficFlow : backTrafficFlows){trafficFlow.setNum(num);num++;}}data.put("dataList",backTrafficFlows);object.put("data",data);return object;}public int getMinutes( String Precision){int back;switch (Precision){case "5":back = fiveMinutes;break;case "15":back = fifthyMinutes;break;case "30":back = thirtyMinutes;break;case "60":back = sixtyMinutes;break;default:back = sixtyMinutes;break;}return back;}}

2.实现Runnable, 使用计数器

// 创建线程public class MyThread implements Runnable {private List<MyVo> myVos;private CountDownLatch latch;public MyThread(List<MyVo> myVos, CountDownLatch latch){this.myVos = myVos;this.latch = latch;}@Overridepublic void run() {// 执行代码逻辑/***写逻辑*/// 计数器计数latch.countDown();}}// 创建线程池ExecutorService executorService = new ThreadPoolExecutor(12, 24,60L, TimeUnit.SECONDS,new LinkedBlockingQueue<>());public void executorMothod(){try {List<List<MyVo>> list = new ArrayList<>();// 创建计数器CountDownLatch latch = new CountDownLatch(list.size());for (List<MyVo> myVos : list) {executorService.submit(new MyThread(myVos, latch));}// 等待计数器执行完成再获取数据latch.await();} catch (InterruptedException e) {log.error(e.getMessage());}finally {if(executorService != null && !executorService.isShutdown()){executorService.shutdown();}}log.info("线程执行完毕!");// 执行后续程序}

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。