package com.jimi.tracker.util.qiniu;
import com.alibaba.fastjson.JSONObject;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.xxx.route.client.pool.RouteClient;
import com.xxx.tracker.trans.netty.handler.AbstractHandler;
import com.xxx.tracker.util.ByteUtil;
import com.xxx.tracker.util.ConfigUtil;
import com.xxx.tracker.util.Des;
import com.xxx.tracker.util.cache.DcImeiAppIdCache;
import com.xxx.tracker.util.metrics.MonitorMetrics;
import com.xxx.utils.StringUtils;
import com.xxx.utils.Tuple;
import com.qiniu.common.QiniuException;
import com.qiniu.http.Response;
import com.qiniu.storage.Configuration;
import com.qiniu.storage.Region;
import com.qiniu.storage.UploadManager;
import com.qiniu.util.Auth;
import com.qiniu.util.StringMap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.AttributeKey;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.*;
import java.util.function.BiPredicate;
import java.util.function.Function;
import java.util.stream.Collectors;
public class QiniuUtil {
private static final Logger log = LoggerFactory.getLogger(QiniuUtil.class);
private static final String QINIU_LOCAL_TEM_DIRECTORY = "qiniu.local.tmp.directory";
private static final String QINIU_LOCAL_ORDERING = "qiniu.local.ordering";
private static final String MAX_FILE_LENGTH_KEY = "max.file.length";
private static final String HTTP = "http://";
private static final String HTTPS = "https://";
private static final String QINIU_ACCESS_KEY = "qiniu.accessKey";
private static final String QINIU_SECRET_KEY = "qiniu.secretKey";
private static final String APP_ID_KEY = "qiniu.app.id";
private static final String BUCKET = "qiniu.bucket";
private static final String CALLBACK_HOST = "qiniu.callback.host";
private static final String CALLBACK_URL = "qiniu.callback.url";
private static final String VIDEO_CALLBACK_URL = "qiniu.video.callback.url";
private static final String CALLBACK_BODY = "qiniu.callback.body";
private static final String CALLBACK_BODY_TYPE = "qiniu.callback.body.type";
private static final String CALLBACK_DES_KEY = "qiniu.callback.des.key";
private static final String TOKEN_EXPIRE_SECOND = "qiniu.token.expire.second";
private static final String DOWN_LOAD_URL = "qiniu.download.url";
private static final String VIDEO_OPS = "qiniu.video.ops";
private static final String VIDEO_PIPELINE = "qiniu.video.pipeline";
private static final String EXPIRED_DAY = "qiniu.expired.day";
private static final String VOICE_ENABLE_LOG = "qiniu.enable.log";
private static final AttributeKey<QiniuConf> FILE_CONF;
private static final String LOCAL_TEM_DIRECTORY;
private static final DateTimeFormatter DATE_TIME_FORMATTER_DAY;
private static final DateTimeFormatter DATE_TIME_FORMATTER_FILE;
private static final DateTimeFormatter DATE_TIME_FORMATTER_TOKEN;
private static final Configuration CONFIGURATION;
private static final Cache<String, Map<Integer, FileData>> FILE_INDEX_DATA_CACHE;
private static final LoadingCache<String, Optional<CommonConf>> APP_ID_COMMON_CONF_MAP_CACHE;
private static final int MAX_FILE_LENGTH;
private static final Auth AUTH;
private static final String DEFAULT_APP_ID;
private static final ThreadPoolExecutor UPLOAD_FILE_POOL = new ThreadPoolExecutor(100, 100, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(100000), new ThreadFactoryBuilder().setNameFormat("pool-upload-consume-thread-%d").build(), (r, executor) -> {
try {
long startTime = System.currentTimeMillis();
executor.getQueue().put(r);
log.info("too many upload task,blocking the upload {} ms", System.currentTimeMillis() - startTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
private static final ScheduledThreadPoolExecutor DELETE_TMP_FILE_THREAD_POOL = new ScheduledThreadPoolExecutor(1, (r) -> {
Thread thread = new Thread(r);
thread.setName("remove.tmp.file");
return thread;
});
static {
ConfigUtil.apolloChange(Sets.newHashSet(APP_ID_KEY, BUCKET, TOKEN_EXPIRE_SECOND, CALLBACK_HOST, CALLBACK_URL, VIDEO_CALLBACK_URL, CALLBACK_BODY, CALLBACK_BODY_TYPE, CALLBACK_DES_KEY, DOWN_LOAD_URL, VIDEO_OPS, VIDEO_PIPELINE, VOICE_ENABLE_LOG, EXPIRED_DAY, QINIU_LOCAL_ORDERING));
MAX_FILE_LENGTH = ConfigUtil.getInt(MAX_FILE_LENGTH_KEY, 4 * 1024 * 1024);
String accessKey = ConfigUtil.getString(QINIU_ACCESS_KEY, "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx");
String secretKey = ConfigUtil.getString(QINIU_SECRET_KEY, "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx");
AUTH = Auth.create(accessKey, secretKey);
String[] gateNames = AbstractHandler.gateId.split("-");
String projectName = null;
if (gateNames.length <= 2) {
projectName = AbstractHandler.gateId;
} else {
projectName = gateNames[0] + "-" + gateNames[1];
}
String tmpDirectory = ConfigUtil.getString(QINIU_LOCAL_TEM_DIRECTORY, "/tmp/qiniu/");
if (!tmpDirectory.endsWith("/")) {
tmpDirectory = tmpDirectory + "/";
}
LOCAL_TEM_DIRECTORY = tmpDirectory + projectName;
FILE_INDEX_DATA_CACHE = CacheBuilder.newBuilder().expireAfterAccess(20, TimeUnit.MINUTES).build();
APP_ID_COMMON_CONF_MAP_CACHE = CacheBuilder.newBuilder().expireAfterAccess(30, TimeUnit.MINUTES).build(new CacheLoader<String, Optional<CommonConf>>() {
@Override
public Optional<CommonConf> load(String appId) {
try {
return Optional.of(CommonConf.getCommonConf(appId));
} catch (Exception e) {
return Optional.empty();
}
}
});
DEFAULT_APP_ID = "TRACKER";
CONFIGURATION = new Configuration(Region.region0());
FILE_CONF = AttributeKey.valueOf("FILE_CONF");
DATE_TIME_FORMATTER_DAY = DateTimeFormatter.ofPattern("yyyy-MM-dd");
DATE_TIME_FORMATTER_FILE = DateTimeFormatter.ofPattern("yyyyMMddHHmmss");
DATE_TIME_FORMATTER_TOKEN = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
monitorCache();
DELETE_TMP_FILE_THREAD_POOL.scheduleAtFixedRate(() -> deleteTmpFile(), 3, 3, TimeUnit.HOURS);
}
private static void monitorCache() {
MonitorMetrics.registerGauge("qiniu." + AbstractHandler.gateId + ".cache.imei.size", () -> FILE_INDEX_DATA_CACHE == null ? 0 : FILE_INDEX_DATA_CACHE.size());
MonitorMetrics.registerGauge("qiniu." + AbstractHandler.gateId + ".cache.data.size", () -> memoryCacheData());
if (ConfigUtil.getApolloValue(QINIU_LOCAL_ORDERING, true)) {
File file = new File(LOCAL_TEM_DIRECTORY);
if (!file.exists()) {
file.mkdirs();
}
MonitorMetrics.registerGauge("qiniu." + AbstractHandler.gateId + ".tmp.file.data", () -> uploadLocalFileLength());
}
MonitorMetrics.registerGauge("qiniu." + AbstractHandler.gateId + ".upload.imei.size", () -> uploadImeiSize());
}
private static long memoryCacheData() {
String imei = null;
try {
if (FILE_INDEX_DATA_CACHE == null || FILE_INDEX_DATA_CACHE.size() <= 0) {
return 0;
}
long len = 0;
Map<Integer, FileData> indexFileDataMap = null;
for (Map.Entry<String, Map<Integer, FileData>> imeiMapEntry : FILE_INDEX_DATA_CACHE.asMap().entrySet()) {
imei = imeiMapEntry.getKey();
indexFileDataMap = imeiMapEntry.getValue();
if (indexFileDataMap == null || indexFileDataMap.isEmpty()) {
continue;
}
len += indexFileDataMap.values().parallelStream().mapToLong(fileData -> {
if (fileData.content == null || fileData.content.length == 0) {
return 0;
}
return fileData.content.length;
}).sum();
}
return len;
} catch (Exception e) {
log.warn("统计缓存数据错误 imei:{}", imei, e);
return 0;
}
}
private static long uploadLocalFileLength() {
try {
File[] list = new File(LOCAL_TEM_DIRECTORY).listFiles();
if (list == null || list.length <= 0) {
return 0;
}
long len = 0;
for (File fileValue : list) {
len += fileValue.length();
}
return len;
} catch (Exception e) {
log.warn("统计文件大小错误", e);
return 0;
}
}
private static void deleteTmpFile() {
try {
LocalDateTime pre2DateTime = LocalDateTime.now().minusDays(2);
String date = pre2DateTime.format(DATE_TIME_FORMATTER_DAY);
File dir = new File(LOCAL_TEM_DIRECTORY + "/" + date);
if (!dir.exists()) {
return;
}
deleteDir(dir);
} catch (Exception e) {
log.warn("删除文件夹失败", e);
}
}
private static boolean deleteDir(File dir) {
if (dir.isDirectory()) {
String[] children = dir.list();
for (int i = 0; i < children.length; i++) {
boolean success = deleteDir(new File(dir, children[i]));
if (!success) {
log.warn("Failed to delete " + dir.getPath() + "/" + children[i]);
}
}
}
boolean deleteSuccess = dir.delete();
if (!deleteSuccess) {
log.warn("Failed to delete " + dir.getPath());
}
return true;
}
private static int uploadImeiSize() {
try {
Map<String, Channel> channelMap = RouteClient.getAllChannel();
if (channelMap == null || channelMap.isEmpty()) {
return 0;
}
int size = 0;
Channel channel = null;
QiniuConf qiniuConf = null;
for (Map.Entry<String, Channel> imeiChannelEntry : channelMap.entrySet()) {
channel = imeiChannelEntry.getValue();
if (channel == null || !channel.isActive()) {
continue;
}
if (channel.attr(FILE_CONF) == null) {
continue;
}
qiniuConf = channel.attr(FILE_CONF).get();
if (qiniuConf == null || StringUtils.isEmpty(qiniuConf.fileName)) {
continue;
}
size++;
}
return size;
} catch (Exception e) {
log.warn("统计上传IMEI大小错误", e);
return 0;
}
}
public static List<Integer> upload(ChannelHandlerContext ctx, FileData fileData) {
return upload(ctx, null, fileData, null, null);
}
public static List<Integer> upload(ChannelHandlerContext ctx, FileData fileData, Function<FileData, String> verifyFileFunction) {
return upload(ctx, null, fileData, verifyFileFunction);
}
public static List<Integer> upload(ChannelHandlerContext ctx, FileTypeEnum fileTypeEnum, FileData fileData, Function<FileData, String> verifyFileFunction) {
return upload(ctx, fileTypeEnum, fileData, verifyFileFunction, null);
}
public static List<Integer> upload(ChannelHandlerContext ctx, FileTypeEnum fileTypeEnum, FileData fileData) {
return upload(ctx, fileTypeEnum, fileData, null, null);
}
public static List<Integer> upload(ChannelHandlerContext ctx, FileData fileData, BiPredicate<Boolean, QiniuConf> biPredicate) {
return upload(ctx, null, fileData, null, biPredicate);
}
public static List<Integer> upload(ChannelHandlerContext ctx, FileData fileData, Function<FileData, String> verifyFileFunction, BiPredicate<Boolean, QiniuConf> biPredicate) {
return upload(ctx, null, fileData, verifyFileFunction, biPredicate);
}
public static List<Integer> upload(ChannelHandlerContext ctx, FileTypeEnum fileTypeEnum, FileData fileData, BiPredicate<Boolean, QiniuConf> biPredicate) {
return upload(ctx, fileTypeEnum, fileData, null, biPredicate);
}
public static List<Integer> upload(ChannelHandlerContext ctx, FileTypeEnum fileTypeEnum, FileData fileData, Function<FileData, String> verifyFileFunction, BiPredicate<Boolean, QiniuConf> biPredicate) {
QiniuConf qiniuConf = null;
String imei = null;
try {
imei = getImei(ctx);
} catch (Exception e) {
log.warn("上传文件,获取IMEI号失败", e);
return null;
}
try {
qiniuConf = qiniuConf(ctx, fileTypeEnum, fileData);
Tuple.TwoTuple<Boolean, Boolean> tuple2 = verifyFile(qiniuConf, fileTypeEnum, fileData, verifyFileFunction);
if (tuple2._1()) {
if (tuple2._2()) {
after(ctx);
qiniuConf.fileTypeEnum(fileTypeEnum).builder();
} else {
qiniuConf.builder();
updateCache(ctx);
}
}
if (fileData.promptlyUpload == null || !fileData.promptlyUpload) {
isWriteLocalOrderingFile(fileData, qiniuConf);
byte[] data = null;
if (qiniuConf.localOrdering) {
data = fileData.content;
fileData.currentLength = data.length;
fileData.content = null;
}
Map<Integer, FileData> copyMap = null;
String fileName = null;
File temFile = null;
synchronized (imei.intern()) {
Map<Integer, FileData> indexDataMap = FILE_INDEX_DATA_CACHE.getIfPresent(qiniuConf.fileName);
if (indexDataMap == null) {
if (fileData.totalCount > 0) {
indexDataMap = Maps.newLinkedHashMapWithExpectedSize(fileData.totalCount);
} else {
indexDataMap = Maps.newLinkedHashMapWithExpectedSize(30);
}
FILE_INDEX_DATA_CACHE.put(qiniuConf.fileName, indexDataMap);
}
if (qiniuConf.localOrdering && !indexDataMap.containsKey(fileData.currentIndex) && data != null && data.length > 0) {
writeLocalFile(qiniuConf, data, fileData);
}
indexDataMap.put(fileData.currentIndex, fileData);
if (ConfigUtil.getApolloValue(VOICE_ENABLE_LOG, true)) {
log.info("imei: {} 录音数据包总数:{} 已经上报的数据量:{} 当前上报的索引:{}", qiniuConf.imei, fileData.totalCount, indexDataMap.size(), fileData.currentIndex);
}
if (indexDataMap.size() < fileData.totalCount) {
if (fileData.totalCount > 0 && fileData.currentIndex == fileData.totalCount) {
List<Integer> supplementalIdList = Lists.newArrayListWithExpectedSize(fileData.totalCount);
for (int i = 1; i <= fileData.totalCount; i++) {
if (indexDataMap.containsKey(i)) {
continue;
}
supplementalIdList.add(i);
}
return supplementalIdList.isEmpty() ? null : supplementalIdList;
}
return null;
}
copyMap = Maps.newHashMap(indexDataMap);
fileName = qiniuConf.fileName;
if (qiniuConf.file != null) {
temFile = qiniuConf.file;
}
after(ctx);
}
List<FileData> sortDataList = copyMap.entrySet().stream().sorted(Map.Entry.comparingByKey()).map(Map.Entry::getValue).collect(Collectors.toList());
if (temFile != null) {
asyncUpload(biPredicate, qiniuConf, fileName, temFile, sortDataList);
} else {
byte[] content = new byte[0];
for (FileData querFileData : sortDataList) {
content = ByteUtil.concat(content, querFileData.content);
}
asyncUpload(biPredicate, qiniuConf, fileName, content, null);
}
} else if (fileData.uploadFile != null) {
asyncUpload(biPredicate, qiniuConf, fileData.uploadFile);
after(ctx);
} else {
asyncUpload(biPredicate, qiniuConf, fileData.content);
after(ctx);
}
return null;
} catch (Exception e) {
log.warn("文件上传失败 imei:{}", imei, e);
callBack(biPredicate, qiniuConf, false);
after(ctx);
return null;
}
}
private static void asyncUpload(BiPredicate<Boolean, QiniuConf> biPredicate, QiniuConf qiniuConf, Object data) {
asyncUpload(biPredicate, qiniuConf, qiniuConf.fileName, data, null);
}
private static void asyncUpload(BiPredicate<Boolean, QiniuConf> biPredicate, QiniuConf qiniuConf, String fileName, Object data, List<FileData> sortDataList) {
final String imei = qiniuConf.imei;
final String token = qiniuConf.token;
UPLOAD_FILE_POOL.submit(() -> {
boolean uploadResult = uploadQiniu(imei, fileName, token, data, sortDataList);
callBack(biPredicate, qiniuConf, uploadResult);
});
}
public static Tuple.TwoTuple<Boolean, Boolean> verifyFile(QiniuConf qiniuConf, FileTypeEnum fileTypeEnum, FileData fileData, Function<FileData, String> verifyFileFunction) {
if (qiniuConf == null || fileData == null) {
return new Tuple.TwoTuple<>(false, false);
}
String verifyFile = null;
if (verifyFileFunction == null) {
verifyFile = fileData.totalCount + "-" + fileData.length;
} else {
verifyFile = verifyFileFunction.apply(fileData);
if (StringUtils.isEmpty(verifyFile)) {
verifyFile = fileData.totalCount + "-" + fileData.length;
}
}
if (StringUtils.isEmpty(qiniuConf.verifyFile)) {
qiniuConf.verifyFile = verifyFile;
if (fileTypeEnum != null && !fileTypeEnum.equals(qiniuConf.fileTypeEnum)) {
qiniuConf.fileTypeEnum = fileTypeEnum;
return new Tuple.TwoTuple<>(true, false);
}
return new Tuple.TwoTuple<>(false, false);
}
if (qiniuConf.verifyFile.equals(verifyFile)) {
if (fileTypeEnum != null && !fileTypeEnum.equals(qiniuConf.fileTypeEnum)) {
qiniuConf.fileTypeEnum = fileTypeEnum;
return new Tuple.TwoTuple<>(true, false);
}
return new Tuple.TwoTuple<>(false, false);
}
log.warn("文件属性以改变,只能丢弃前面的数据包信息 old: {} new:{}", qiniuConf.verifyFile, verifyFile);
qiniuConf.verifyFile = verifyFile;
return new Tuple.TwoTuple<>(true, true);
}
private static boolean callBack(BiPredicate<Boolean, QiniuConf> biPredicate, QiniuConf qiniuConf, boolean uploadResult) {
if (biPredicate == null) {
return true;
}
try {
return biPredicate.test(uploadResult, qiniuConf);
} catch (Exception e) {
log.warn("回调失败 qiniuConf:{}", qiniuConf, e);
return false;
}
}
public static void after(ChannelHandlerContext ctx) {
try {
after(qiniuConf(ctx, null));
} catch (Exception e) {
log.warn("回收资源错误 imei:{}", AbstractHandler.getImeiByCtx(ctx), e);
}
}
private static void after(QiniuConf qiniuConf) {
if (qiniuConf == null) {
return;
}
FILE_INDEX_DATA_CACHE.invalidate(qiniuConf.fileName);
qiniuConf.fileName = null;
qiniuConf.offset = 0;
qiniuConf.file = null;
qiniuConf.localOrdering = null;
qiniuConf.downloadUrl = null;
qiniuConf.fileTypeEnum = null;
qiniuConf.fileNameTime = null;
qiniuConf.verifyFile = null;
}
private static void updateCache(ChannelHandlerContext ctx) {
try {
QiniuConf qiniuConf = qiniuConf(ctx, null);
String fileName = qiniuConf.fileName;
if (StringUtils.isEmpty(fileName)) {
return;
}
qiniuConf.builder();
if (fileName.equals(qiniuConf.fileName)) {
return;
}
Map<Integer, FileData> indexDataMap = FILE_INDEX_DATA_CACHE.getIfPresent(fileName);
if (indexDataMap == null || indexDataMap.isEmpty()) {
return;
}
FILE_INDEX_DATA_CACHE.put(qiniuConf.fileName, indexDataMap);
FILE_INDEX_DATA_CACHE.invalidate(fileName);
} catch (Exception e) {
log.warn("回收资源错误 imei:{}", AbstractHandler.getImeiByCtx(ctx), e);
}
}
public static boolean hasRecordMsg(ChannelHandlerContext ctx) {
try {
QiniuConf qiniuConf = qiniuConf(ctx, null);
if (StringUtils.isEmpty(qiniuConf.fileName)) {
return false;
}
return FILE_INDEX_DATA_CACHE.getIfPresent(qiniuConf.fileName) != null;
} catch (Exception e) {
log.warn("判断录音信息错误 imei:{}", AbstractHandler.getImeiByCtx(ctx), e);
return false;
}
}
public static void removeCacheFile(ChannelHandlerContext ctx) {
try {
QiniuConf qiniuConf = ctx.channel().attr(FILE_CONF).get();
String fileName = qiniuConf.fileName;
if (StringUtils.isEmpty(fileName)) {
return;
}
FILE_INDEX_DATA_CACHE.invalidate(fileName);
} catch (Exception e) {
log.warn("回收资源错误 imei:{}", AbstractHandler.getImeiByCtx(ctx), e);
}
}
public static void removeCacheFile(Channel channel) {
String imei = null;
try {
if (channel == null) {
return;
}
imei = channel.attr(AbstractHandler.loginImeiKey).get();
after(channel.attr(FILE_CONF).get());
} catch (Exception e) {
log.warn("回收资源错误 imei:{}", imei, e);
}
}
public static boolean uploadQiniu(final String imei, final String fileName, final String token, Object data, List<FileData> sortDataList) {
try {
Response response = null;
if (data == null) {
log.error("上传文件失败 imei:{} 数据为空", imei);
return false;
}
if (data instanceof File) {
File file = (File) data;
if (!file.isFile()) {
log.error("上传文件失败 imei:{} 文件不存在", file.getPath());
return false;
}
if (sortDataList != null) {
file = organizeFiles(file, sortDataList);
}
response = new UploadManager(CONFIGURATION).put(file, fileName, token);
file.delete();
} else {
response = new UploadManager(CONFIGURATION).put((byte[]) data, fileName, token);
}
log.info("文件上传信息 fileName: {} response: {}", fileName, response.getInfo());
return true;
} catch (QiniuException e) {
if (e.response != null && e.response.getInfo().contains("file exists")) {
return true;
}
log.error("上传文件失败 imei:{} case: {}", imei, (null == e.response ? "" : e.response.toString()), e);
return false;
} catch (Exception e) {
log.error("上传文件失败 imei:{} ", imei, e);
return false;
}
}
@NotNull
private static QiniuConf qiniuConf(ChannelHandlerContext ctx, FileTypeEnum fileTypeEnum, FileData fileData) throws Exception {
if (fileData == null) {
throw new Exception("上传fileData为空");
}
if (fileData.content == null && fileData.uploadFile == null) {
throw new Exception("上传数据为空");
}
if (fileData.content != null && fileData.content.length > MAX_FILE_LENGTH && !isWriteLocalOrderingFile(fileData, null)) {
throw new Exception("大于4M的文件,数据包必须要顺序,上报时会直接写本地磁盘,最后直接上传文件");
}
return qiniuConf(ctx, fileTypeEnum);
}
public static QiniuConf qiniuConf(ChannelHandlerContext ctx, FileTypeEnum fileTypeEnum) throws Exception {
if (ctx == null || ctx.channel() == null || !ctx.channel().isActive()) {
throw new Exception("操作七牛云时,连接已断开,上传失败");
}
String imei = getImei(ctx);
QiniuConf qiniuConf = ctx.channel().attr(FILE_CONF).get();
if (qiniuConf == null) {
qiniuConf = QiniuConf.build(imei);
ctx.channel().attr(FILE_CONF).set(qiniuConf);
}
boolean updateFileTypeFlag = fileTypeEnum != null && (qiniuConf.fileTypeEnum == null || !fileTypeEnum.equals(qiniuConf.fileTypeEnum));
if (updateFileTypeFlag) {
qiniuConf.fileTypeEnum(fileTypeEnum);
if (StringUtils.isNotEmpty(qiniuConf.fileName)) {
qiniuConf.updateFileName(fileTypeEnum);
}
}
if (StringUtils.isEmpty(qiniuConf.fileName) || StringUtils.isEmpty(qiniuConf.token)) {
qiniuConf.builder();
}
if (StringUtils.isEmpty(qiniuConf.token)) {
throw new Exception("获取TOKEN失败");
}
if (StringUtils.isEmpty(qiniuConf.fileName)) {
throw new Exception("上传文件名不能为空");
}
return qiniuConf;
}
@NotNull
private static String getImei(ChannelHandlerContext ctx) throws Exception {
if (ctx == null) {
throw new Exception("通道已断开");
}
String imei = AbstractHandler.getImeiByCtx(ctx);
if (StringUtils.isEmpty(imei)) {
throw new Exception("上传IMEI为空,请检查设备是否已登录");
}
return imei;
}
private static Object getRegionString(Region region, String key) {
Class<?> targetClass = region.getClass();
Field field;
try {
field = targetClass.getDeclaredField(key);
field.setAccessible(true);
return field.get(region);
} catch (Exception e) {
log.error("Server get region error:", e);
return null;
}
}
private static boolean writeLocalFile(QiniuConf qiniuConf, byte[] data, FileData fileData) {
if (qiniuConf.file == null) {
qiniuConf.file = new File(LOCAL_TEM_DIRECTORY + "/" + qiniuConf.fileDay + "/" + qiniuConf.fileName);
try {
if (!qiniuConf.file.getParentFile().exists()) {
qiniuConf.file.getParentFile().mkdirs();
}
qiniuConf.file.createNewFile();
} catch (IOException e) {
log.warn("创建文件失败", e);
return false;
}
}
if (data == null || data.length <= 0) {
return true;
}
try (RandomAccessFile out = new RandomAccessFile(qiniuConf.file, "rw")) {
out.seek(qiniuConf.offset);
out.write(data);
fileData.adjustByteOrder = new AdjustByteOrder(qiniuConf.offset, qiniuConf.offset + data.length);
qiniuConf.offset = qiniuConf.offset + data.length;
return true;
} catch (FileNotFoundException e) {
log.error("Cache File {} not found, Error: {}", qiniuConf.fileName, e.getMessage());
} catch (IOException e) {
log.error("Cache File {} Error: {}", qiniuConf.fileName, e.getMessage());
}
return false;
}
private static File organizeFiles(File tempFile, List<FileData> sortDataList) {
if (tempFile == null) {
return null;
}
boolean isOrganizeFlag = false;
long firstIndex = 0;
for (FileData fileData : sortDataList) {
if (fileData.adjustByteOrder == null) {
return tempFile;
}
if (fileData.adjustByteOrder.isOrganize(firstIndex)) {
isOrganizeFlag = true;
break;
}
firstIndex = firstIndex + (fileData.adjustByteOrder.tempEndIndex - fileData.adjustByteOrder.tempFirstIndex);
}
if (!isOrganizeFlag) {
return tempFile;
}
String fileName = tempFile.getName();
int indexLast = fileName.lastIndexOf(".");
String tmpFileName = fileName.substring(0, indexLast) + "_tmp" + fileName.substring(indexLast, fileName.length());
File newTempFile = new File(tempFile.getPath().replace(fileName, tmpFileName));
try {
newTempFile.createNewFile();
} catch (IOException e) {
log.warn("创建临时文件失败 path:{}", tempFile.getPath(), e);
return tempFile;
}
try (RandomAccessFile sourceFile = new RandomAccessFile(tempFile, "r");
FileChannel sourceChannel = sourceFile.getChannel();
RandomAccessFile targetFile = new RandomAccessFile(newTempFile, "rw");) {
ByteBuffer buffer = null;
long offset = 0;
byte[] bytes = null;
for (FileData fileData : sortDataList) {
buffer = ByteBuffer.allocate((int) (fileData.adjustByteOrder.tempEndIndex - fileData.adjustByteOrder.tempFirstIndex));
sourceChannel.position(fileData.adjustByteOrder.tempFirstIndex);
while (buffer.hasRemaining() && sourceChannel.read(buffer) != -1) {
break;
}
buffer.flip();
targetFile.seek(offset);
bytes = buffer.array();
targetFile.write(bytes);
offset = offset + bytes.length;
}
} catch (IOException e) {
log.warn("重整文件失败 path:{}", tempFile.getPath(), e);
return tempFile;
}
try {
tempFile.delete();
} catch (Exception e) {
}
return newTempFile;
}
private static boolean isWriteLocalOrderingFile(FileData fileData, QiniuConf qiniuConf) {
if (qiniuConf == null) {
if (fileData != null && fileData.offset > 0) {
return true;
}
return ConfigUtil.getApolloValue(QINIU_LOCAL_ORDERING, true);
}
if (qiniuConf.localOrdering != null) {
return qiniuConf.localOrdering;
}
if (fileData.localOrdering != null) {
qiniuConf.localOrdering = fileData.localOrdering;
return qiniuConf.localOrdering;
}
if (fileData.offset < 0) {
qiniuConf.localOrdering = ConfigUtil.getApolloValue(QINIU_LOCAL_ORDERING, true);
return qiniuConf.localOrdering;
}
if (fileData.offset > 0) {
qiniuConf.localOrdering = true;
return true;
}
qiniuConf.localOrdering = false;
return false;
}
public static String downLoad(String fileName) {
return downLoad(HTTP + ConfigUtil.getApolloValue(DOWN_LOAD_URL, "xxxx.xxxxxx.xxxxx"), fileName);
}
public static String downLoad(String downDomain, String fileName) {
return downDomain + "/" + fileName;
}
private static class CommonConf {
private String host;
private String bucket;
private String callbackHost;
private String callbackUrl;
private String callbackVideoUrl;
private String callbackBody;
private String callbackBodyType;
private int expiresSecond;
private int retryTimes;
private String callbackDesKey;
private String videoOps;
private String videoPipeline;
private CommonConf() {
}
private static CommonConf getCommonConf(String appId) {
try {
String bucket = ConfigUtil.getApolloValue(BUCKET, "");
if (StringUtils.isEmpty(bucket)) {
log.error("上传的Bucket不能为空");
return null;
}
CommonConf commonConf = new CommonConf();
List<String> srcUpHosts = (List<String>) getRegionString(CONFIGURATION.region, "srcUpHosts");
String domain = Configuration.defaultRsHost;
if (null != srcUpHosts && !srcUpHosts.isEmpty()) {
domain = srcUpHosts.get(0);
}
String callbackHost = ConfigUtil.getApolloValue(CALLBACK_HOST, "xxxxxxxxx");
String callbackUrl = ConfigUtil.getApolloValue(CALLBACK_URL, "xxxxxxxxx");
String callbackVideoUrl = ConfigUtil.getApolloValue(VIDEO_CALLBACK_URL, "xxxxxxxx");
String callbackBody = ConfigUtil.getApolloValue(CALLBACK_BODY, "xxxxxxxx");
String callbackBodyType = ConfigUtil.getApolloValue(CALLBACK_BODY_TYPE, "application/json");
String callbackDesKey = ConfigUtil.getApolloValue(CALLBACK_DES_KEY, "xxxxxx");
String videoOps = ConfigUtil.getApolloValue(VIDEO_OPS, "xxxxxxxx");
String videoPipeline = ConfigUtil.getApolloValue(VIDEO_PIPELINE, "matrix");
int expiresSecond = ConfigUtil.getApolloValue(TOKEN_EXPIRE_SECOND, 25) * 60;
commonConf.bucket = getValueByAppId(bucket, appId);
commonConf.host = HTTP + domain;
commonConf.retryTimes = 3;
commonConf.expiresSecond = expiresSecond;
commonConf.callbackHost = getValueByAppId(callbackHost, appId);
commonConf.callbackUrl = commonConf.callbackHost + getValueByAppId(callbackUrl, appId);
commonConf.callbackVideoUrl = commonConf.callbackHost + getValueByAppId(callbackVideoUrl, appId);
if (!commonConf.callbackHost.contains(HTTP) && !commonConf.callbackHost.contains(HTTPS)) {
commonConf.callbackUrl = HTTP + commonConf.callbackUrl;
commonConf.callbackVideoUrl = HTTP + commonConf.callbackVideoUrl;
}
commonConf.callbackBody = getValueByAppId(callbackBody, appId);
commonConf.callbackBodyType = getValueByAppId(callbackBodyType, appId);
commonConf.callbackDesKey = getValueByAppId(callbackDesKey, appId);
commonConf.videoOps = getValueByAppId(videoOps, appId);
commonConf.videoPipeline = getValueByAppId(videoPipeline, appId);
return commonConf;
} catch (Exception e) {
log.warn("加载共用配置错误 appId:{}", appId);
return null;
}
}
private static String getValueByAppId(String value, String appId) {
if (!value.contains(",")) {
return value;
}
if (StringUtils.isEmpty(appId)) {
appId = DEFAULT_APP_ID;
}
String[] appIdValues = value.split(",");
appId = appId.toUpperCase();
String defaultValue = null;
for (String appIdValue : appIdValues) {
if (appIdValue.toUpperCase().startsWith(appId + "|")) {
return appIdValue.substring((appId + "|").length());
}
if (appIdValue.toUpperCase().startsWith(DEFAULT_APP_ID + "|")) {
defaultValue = appIdValue.substring(8);
}
}
return defaultValue;
}
}
public static class QiniuConf {
private CommonConf commonConf;
private String imei;
private String token;
private String verifyFile;
private String fileName;
private LocalDateTime fileNameTime;
private String fileDay;
private Integer expiredDay;
private String taskId;
private String platformId;
private String endUser;
private String downloadUrl;
private FileTypeEnum fileTypeEnum;
private Long expiresSecondTime;
private boolean upload = false;
private int offset;
private File file;
private Boolean localOrdering;
private QiniuConf() {
}
public static QiniuConf build(String imei) {
QiniuConf qiniuConf = new QiniuConf();
qiniuConf.imei = imei;
try {
qiniuConf.commonConf = APP_ID_COMMON_CONF_MAP_CACHE.get(QiniuConf.getAppId(qiniuConf.imei)).get();
} catch (ExecutionException e) {
throw new RuntimeException("Qiniu云配置信息错误 imei:" + qiniuConf.imei, e);
}
return qiniuConf;
}
public QiniuConf token(String token) {
if (StringUtils.isEmpty(token) && StringUtils.isNotEmpty(this.token)) {
return this;
}
this.token = token;
return this;
}
public QiniuConf verifyFile(String verifyFile) {
this.verifyFile = verifyFile;
return this;
}
public QiniuConf fileName(String fileName) {
this.fileName = fileName;
return this;
}
public QiniuConf fileTypeEnum(FileTypeEnum fileTypeEnum) {
if (fileTypeEnum == null && this.fileTypeEnum != null) {
return this;
}
this.fileTypeEnum = fileTypeEnum;
return this;
}
public QiniuConf expiredDay(Integer expiredDay) {
this.expiredDay = expiredDay;
return this;
}
public QiniuConf taskId(String taskId) {
this.taskId = taskId;
return this;
}
public QiniuConf platformId(String platformId) {
this.platformId = platformId;
return this;
}
public QiniuConf expiresSecondTime(Long expiresSecondTime) {
this.expiresSecondTime = expiresSecondTime;
return this;
}
public String getImei() {
return imei;
}
public String getToken() {
return token;
}
public Long getExpiresSecondTime() {
return expiresSecondTime;
}
public String getFileName() {
return fileName;
}
public String getTaskId() {
return taskId;
}
public String getPlatformId() {
return platformId;
}
public String getEndUser() {
return endUser;
}
public String getDownloadUrl() {
return downloadUrl;
}
public FileTypeEnum getFileTypeEnum() {
return fileTypeEnum;
}
private QiniuConf builder() {
try {
if (this.fileTypeEnum == null) {
log.info("上传文件类型为空,使用默认信息");
fileTypeEnum = FileTypeEnum.AUDIO_ALARM;
}
if (this.fileName == null) {
if (this.fileNameTime == null) {
this.fileNameTime = LocalDateTime.now();
this.fileDay = this.fileNameTime.format(DATE_TIME_FORMATTER_DAY);
}
updateFileName(this.fileTypeEnum);
}
if (StringUtils.isNotEmpty(taskId) && StringUtils.isEmpty(platformId)) {
this.platformId = taskId.indexOf("@") > 0 ? taskId.substring(0, taskId.indexOf("@")) : "";
}
this.downloadUrl = StringUtils.defaultString(this.downloadUrl, HTTP + ConfigUtil.getApolloValue(DOWN_LOAD_URL, "xxx.xxx.xxxx") + "/" + this.fileName);
if (StringUtils.isEmpty(token) || System.currentTimeMillis() >= expiresSecondTime) {
this.commonConf = APP_ID_COMMON_CONF_MAP_CACHE.get(QiniuConf.getAppId(this.imei)).get();
if (this.commonConf == null) {
log.warn("获取Qiniu云公共配置为空 imei:{}", imei);
throw new RuntimeException("获取七牛云公共配置失败 imei:" + imei);
}
getToken(this, this.fileNameTime);
}
} catch (Exception e) {
log.warn("获取TOKEN信息失败 this", JSONObject.toJSONString(this), e);
}
return this;
}
private String updateFileName(FileTypeEnum fileTypeEnum) {
if (this.expiredDay == null) {
this.expiredDay = ConfigUtil.getApolloValue(EXPIRED_DAY, 186);
}
this.fileName = this.expiredDay + "-" + this.imei + "-" + fileTypeEnum + "-" + this.fileNameTime.format(DATE_TIME_FORMATTER_FILE) + fileTypeEnum.suffix;
getToken(this, this.fileNameTime);
this.downloadUrl = HTTP + ConfigUtil.getApolloValue(DOWN_LOAD_URL, "statics.aichezaixian.com") + "/" + this.fileName;
return this.fileName;
}
public static String getToken(QiniuConf qiniuConf, LocalDateTime localDateTime) {
try {
if (StringUtils.isEmpty(qiniuConf.commonConf.bucket)) {
log.error("获取Qiniu云TOKEN时bucket 不能为空");
return null;
}
Des des = new Des(qiniuConf.commonConf.callbackDesKey);
StringBuffer sb = new StringBuffer(qiniuConf.imei).append("#").append(Optional.ofNullable(qiniuConf.platformId).orElse("")).append("#");
if (qiniuConf.fileTypeEnum == FileTypeEnum.AUDIO_ORDINARY) {
sb.append(TriggerTypeEnum.MANUAL.value);
} else {
sb.append(TriggerTypeEnum.PHONIC.value);
}
qiniuConf.endUser = sb.append("#").append(localDateTime.format(DATE_TIME_FORMATTER_TOKEN)).append("#").append(Optional.ofNullable(qiniuConf.taskId).orElse("")).append("#").append(qiniuConf.fileTypeEnum.type).toString();
StringMap policy = new StringMap().put("callbackUrl", qiniuConf.commonConf.callbackUrl).put("callbackHost", qiniuConf.commonConf.callbackHost).put("callbackBody", qiniuConf.commonConf.callbackBody).put("persistentOps", qiniuConf.commonConf.videoOps)
.put("persistentPipeline", qiniuConf.commonConf.videoPipeline)
.put("persistentNotifyUrl", qiniuConf.commonConf.callbackVideoUrl).put("endUser", des.encrypt(qiniuConf.endUser));
qiniuConf.token = AUTH.uploadToken(qiniuConf.commonConf.bucket, null, qiniuConf.commonConf.expiresSecond, policy);
if (StringUtils.isEmpty(qiniuConf.token)) {
throw new Exception("获取七牛云TOKEN失败");
}
qiniuConf.expiresSecondTime = System.currentTimeMillis() + (qiniuConf.commonConf.expiresSecond - 5 * 60) * 1000;
return qiniuConf.token;
} catch (Exception e) {
log.error("获取Qiniu云TOKEN错误", e);
return null;
}
}
private static String getAppId(String imei) {
String allAppId = ConfigUtil.getApolloValue(APP_ID_KEY, DEFAULT_APP_ID);
String appId = null;
if (!allAppId.equals(DEFAULT_APP_ID)) {
appId = DcImeiAppIdCache.getAppIdWithCache(imei);
log.info("获取 imei:{} appId:{}", imei, appId);
}
if (StringUtils.isEmpty(appId)) {
appId = DEFAULT_APP_ID;
}
return appId;
}
@Override
public String toString() {
return "QiniuConf{" + "token='" + token + '\'' + ", imei='" + imei + '\'' + ", fileName='" + fileName + '\'' + ", taskId='" + taskId + '\'' + ", platformId='" + platformId + '\'' + ", endUser='" + endUser + '\'' + ", downloadUrl='" + downloadUrl + '\'' + ", fileTypeEnum=" + fileTypeEnum + ", expiresSecondTime=" + expiresSecondTime + '}';
}
}
public enum FileTypeEnum {
IMG((byte) 0x00, ".jpg"),
PNG((byte) 0x00, ".png"),
AUDIO((byte) 0x01, ".wav"),
AUDIO_ORDINARY((byte) 0x03, ".amr"),
AUDIO_ALARM((byte) 0x02, ".amr"),
MP4((byte) 0x02, ".mp4"),
TEXT((byte) 0x03, ".bin"),
OTHER((byte) 0x04, ".tmp");
byte type;
String suffix;
FileTypeEnum(byte type, String suffix) {
this.type = type;
this.suffix = suffix;
}
public static FileTypeEnum getSuffixEnum(String suffix) {
for (FileTypeEnum item : FileTypeEnum.values()) {
if (item.getSuffix().equalsIgnoreCase(suffix)) {
return item;
}
}
return OTHER;
}
public byte getType() {
return type;
}
public String getSuffix() {
return suffix;
}
public void setType(byte type) {
this.type = type;
}
public void setSuffix(String value) {
this.suffix = value;
}
@Override
public String toString() {
return this.type + "-" + this.suffix.hashCode();
}
}
public enum TriggerTypeEnum {
MANUAL("MANUAL", "发送指令触发"),
ACTIVE("ACTIVE", "自动触发统称"),
SHAKE("SHAKE", "震动触发"),
PHONIC("PHONIC", "声控触发");
private String value;
private String name;
private TriggerTypeEnum(String value, String name) {
this.value = value;
this.name = name;
}
@Override
public String toString() {
return name;
}
public String getValue() {
return value;
}
public String getName() {
return name;
}
public static boolean ifTriggerType(String value) {
for (TriggerTypeEnum triggerTypeEnum : TriggerTypeEnum.values()) {
if (triggerTypeEnum.getValue().equals(value)) {
return true;
}
}
return false;
}
public static TriggerTypeEnum getTriggerType(String value) {
for (TriggerTypeEnum triggerTypeEnum : TriggerTypeEnum.values()) {
if (triggerTypeEnum.getValue().equals(value)) {
return triggerTypeEnum;
}
}
return MANUAL;
}
}
public static class FileData {
private int offset = -1;
private int length;
private int totalCount;
private int currentIndex;
private int currentLength;
private byte[] content;
private Boolean promptlyUpload;
private File uploadFile;
private Boolean localOrdering;
private AdjustByteOrder adjustByteOrder;
private FileData() {
}
public FileData offset(int offset) {
this.offset = offset;
return this;
}
public FileData length(int length) {
this.length = length;
return this;
}
public FileData totalCount(int totalCount) {
this.totalCount = totalCount;
return this;
}
public FileData currentIndex(int currentIndex) {
this.currentIndex = currentIndex;
return this;
}
public FileData currentLength(int currentLength) {
this.currentLength = currentLength;
return this;
}
public FileData content(byte[] content) {
this.content = content;
return this;
}
public FileData uploadFile(File uploadFile) {
this.uploadFile = uploadFile;
this.promptlyUpload = true;
return this;
}
public FileData promptlyUpload(Boolean promptlyUpload) {
this.promptlyUpload = promptlyUpload;
return this;
}
public FileData localOrdering(Boolean localOrdering) {
this.localOrdering = localOrdering;
return this;
}
public int getOffset() {
return offset;
}
public int getLength() {
return length;
}
public int getTotalCount() {
return totalCount;
}
public int getCurrentIndex() {
return currentIndex;
}
public int getCurrentLength() {
return currentLength;
}
public byte[] getContent() {
return content;
}
public Boolean getPromptlyUpload() {
return promptlyUpload;
}
public File getUploadFile() {
return uploadFile;
}
public static FileData build() {
return new FileData();
}
}
private static class AdjustByteOrder {
private Long tempFirstIndex;
private Long tempEndIndex;
public AdjustByteOrder(long tempFirstIndex, long tempEndIndex) {
this.tempFirstIndex = tempFirstIndex;
this.tempEndIndex = tempEndIndex;
}
private boolean isOrganize(long firstIndex) {
if (this.tempFirstIndex == null) {
return false;
}
return tempFirstIndex != firstIndex;
}
}
}