diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniExecuteWithHookContext.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniExecuteWithHookContext.java index 8e6413bc41820de8af6727efe98ece6610025c72..749b12a4855fccd6e4f8f72540ef408573a637ab 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniExecuteWithHookContext.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniExecuteWithHookContext.java @@ -400,11 +400,7 @@ public class OmniExecuteWithHookContext implements ExecuteWithHookContext { boolean reduceSinkCanReplace = false; for (BaseWork child : tezWork.getChildren(work)) { TezEdgeProperty.EdgeType edgeType = tezWork.getEdgeType(work, child); - if ((edgeType == TezEdgeProperty.EdgeType.ONE_TO_ONE_EDGE - || edgeType == TezEdgeProperty.EdgeType.CUSTOM_SIMPLE_EDGE - || edgeType == TezEdgeProperty.EdgeType.BROADCAST_EDGE - || edgeType == TezEdgeProperty.EdgeType.XPROD_EDGE) - && isReplaceable(child.getAnyRootOperator(), true)) { + if (isSupportEdgeType(edgeType) && isReplaceable(child.getAnyRootOperator(), true)) { reduceSinkCanReplace = true; } if (edgeType == TezEdgeProperty.EdgeType.SIMPLE_EDGE && child instanceof ReduceWork) { @@ -444,6 +440,13 @@ public class OmniExecuteWithHookContext implements ExecuteWithHookContext { return reduceSinkCanReplace; } + private boolean isSupportEdgeType(TezEdgeProperty.EdgeType edgeType) { + return edgeType == TezEdgeProperty.EdgeType.ONE_TO_ONE_EDGE + || edgeType == TezEdgeProperty.EdgeType.CUSTOM_SIMPLE_EDGE + || edgeType == TezEdgeProperty.EdgeType.BROADCAST_EDGE + || edgeType == TezEdgeProperty.EdgeType.XPROD_EDGE; + } + private void setParentWorkToCurrentWorkEdge(BaseWork work, TezEdgeProperty.EdgeType edgeType) { for (BaseWork parent : tezWork.getParents(work)) { tezWork.getEdgeProperty(parent, work).setEdgeType(edgeType); @@ -712,7 +715,10 @@ public class OmniExecuteWithHookContext implements ExecuteWithHookContext { @Override public boolean equals(Object obj) { - return (this.operator == ((OperatorInfo) obj).getOperator()); + if (obj instanceof OperatorInfo) { + return (this.operator == ((OperatorInfo) obj).getOperator()); + } + return false; } @Override diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniGroupByOperator.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniGroupByOperator.java index b03532bc78fd6a361ba246f3a91649dc5a4662f5..6938052b0ddfff8f2526ab036b55a1320a73be53 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniGroupByOperator.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniGroupByOperator.java @@ -67,16 +67,10 @@ public class OmniGroupByOperator extends OmniHiveOperator imple private transient ObjectInspector[] currentKeyObjectInspectors; private transient ExprNodeEvaluator[][] aggregationParameterFields; private transient ObjectInspector[][] aggregationParameterObjectInspectors; - private transient GenericUDAFEvaluator[] aggregationEvaluators; private transient List allStructFieldRefs; - private transient List keyStructFieldRefs; private transient int numKeys; - private transient List valueStructFieldRefs; private transient boolean groupingSetsPresent; - private transient int groupingSetsPosition; private transient List groupingSets; - private transient FastBitSet[] groupingSetsBitSet; - private transient LongWritable[] newKeysGroupingSets; private transient int outputKeyLength; private VectorizationContext vectorizationContext; private VectorizationContext vOutContext; @@ -156,7 +150,7 @@ public class OmniGroupByOperator extends OmniHiveOperator imple } // build outputDataType - aggregationEvaluators = new GenericUDAFEvaluator[conf.getAggregators().size()]; + GenericUDAFEvaluator[] aggregationEvaluators = new GenericUDAFEvaluator[conf.getAggregators().size()]; for (int i = 0; i < aggregationEvaluators.length; i++) { AggregationDesc agg = conf.getAggregators().get(i); aggregationEvaluators[i] = agg.getGenericUDAFEvaluator(); @@ -214,9 +208,9 @@ public class OmniGroupByOperator extends OmniHiveOperator imple groupingSetsPresent = conf.isGroupingSetsPresent(); if (groupingSetsPresent) { groupingSets = conf.getListGroupingSets(); - groupingSetsPosition = conf.getGroupingSetPosition(); - newKeysGroupingSets = new LongWritable[groupingSets.size()]; - groupingSetsBitSet = new FastBitSet[groupingSets.size()]; + int groupingSetsPosition = conf.getGroupingSetPosition(); + LongWritable[] newKeysGroupingSets = new LongWritable[groupingSets.size()]; + FastBitSet[] groupingSetsBitSet = new FastBitSet[groupingSets.size()]; int pos = 0; for (Long groupingSet : groupingSets) { // Create the mapping corresponding to the grouping set @@ -227,9 +221,11 @@ public class OmniGroupByOperator extends OmniHiveOperator imple } if ((allStructFieldRefs.size() == 2) && (allStructFieldRefs.get(0).getFieldName().equals("key"))) { - keyStructFieldRefs = ((StandardStructObjectInspector) allStructFieldRefs.get(0).getFieldObjectInspector()) + List keyStructFieldRefs = + ((StandardStructObjectInspector) allStructFieldRefs.get(0).getFieldObjectInspector()) .getAllStructFieldRefs(); - valueStructFieldRefs = ((StandardStructObjectInspector) allStructFieldRefs.get(1).getFieldObjectInspector()) + List valueStructFieldRefs = + ((StandardStructObjectInspector) allStructFieldRefs.get(1).getFieldObjectInspector()) .getAllStructFieldRefs(); groupByChanel = getExprFromStructField(keyStructFieldRefs); aggChannels = getTwoDimenExprFromStructField(valueStructFieldRefs); diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniHiveOperator.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniHiveOperator.java index 1b03ed0eb984db85d30617efb9779d89b22efd87..ff84ce1bd833b3ae8b8ccd9af077e082f8090d92 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniHiveOperator.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniHiveOperator.java @@ -102,11 +102,9 @@ public abstract class OmniHiveOperator extends Operator< @Override public void startGroup() throws HiveException { - return; } @Override public void endGroup() throws HiveException { - return; } } \ No newline at end of file diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniMapJoinOperator.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniMapJoinOperator.java index d6dfcf889caa1f0ed0f25fe1c1d94a15543d82fa..7dc64edc80e0624db514a559b86be4665888141d 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniMapJoinOperator.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniMapJoinOperator.java @@ -834,12 +834,10 @@ public class OmniMapJoinOperator extends AbstractMapJoinOperator @Override public void startGroup() throws HiveException { - return; } @Override public void endGroup() throws HiveException { - return; } private static class OmniReaderWrapper { diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniMergeJoinOperator.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniMergeJoinOperator.java index 69f72b829c92ef9ecfc72f99f115f507b51f9ea4..19d0a06527d8a902b2bbad46b0842013b0fd90e0 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniMergeJoinOperator.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniMergeJoinOperator.java @@ -111,11 +111,9 @@ public class OmniMergeJoinOperator extends OmniJoinOperator { @Override public void startGroup() throws HiveException { - return; } @Override public void endGroup() throws HiveException { - return; } } \ No newline at end of file diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniPTFOperator.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniPTFOperator.java index 0ff9f06d9b3e662e6f069953b6ec84403552b47f..ecc8e89f361ca436a80086f7f8d3362d2bee31b6 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniPTFOperator.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniPTFOperator.java @@ -48,10 +48,9 @@ public class OmniPTFOperator extends OmniHiveOperator implements Se private static final Logger LOG = LoggerFactory.getLogger(OmniPTFOperator.class.getName()); private transient OmniWindowOperatorFactory omniWindowOperatorFactory; private transient OmniOperator omniOperator; - private transient List allStructFieldRefs; + boolean isMapOperator; - private transient WindowTableFunctionDef windowTableFunctionDef; transient Configuration hiveConf; public OmniPTFOperator() { @@ -73,7 +72,7 @@ public class OmniPTFOperator extends OmniHiveOperator implements Se hiveConf = jobConf; isMapOperator = conf.isMapSide(); reconstructQueryDef(hiveConf); - windowTableFunctionDef = (WindowTableFunctionDef) conf.getFuncDef(); + WindowTableFunctionDef windowTableFunctionDef = (WindowTableFunctionDef) conf.getFuncDef(); List windowFunctions = windowTableFunctionDef.getWindowFunctions(); List windowOrderExpressions = windowTableFunctionDef.getOrder().getExpressions(); @@ -84,7 +83,8 @@ public class OmniPTFOperator extends OmniHiveOperator implements Se outputObjInspector = conf.getFuncDef().getOutputShape().getOI(); } - allStructFieldRefs = ((StandardStructObjectInspector) inputObjInspectors[0]).getAllStructFieldRefs(); + List allStructFieldRefs = ((StandardStructObjectInspector) inputObjInspectors[0]) + .getAllStructFieldRefs(); DataType[] sourceTypes = getExprFromStructField(allStructFieldRefs); int[] outputChannels = getOutputChannels((StandardStructObjectInspector) inputObjInspectors[0], diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniReduceSinkOperator.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniReduceSinkOperator.java index cf70011b05e7ffbbe8e8dab8f4b1b6f83ff77d5c..214db86557588802d9e59ca61330e71b8b03643b 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniReduceSinkOperator.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniReduceSinkOperator.java @@ -116,7 +116,6 @@ public class OmniReduceSinkOperator extends TerminalOperator */ private transient int buckColIdxInKeyForSdpo = -1; private boolean firstRow; - private transient int tag; private boolean skipTag = false; private transient int[] valueIndex; // index for value(+ from keys, - from values) @@ -194,14 +193,11 @@ public class OmniReduceSinkOperator extends TerminalOperator private long[] keyFieldId; private long[] valueFieldId; private transient boolean needProject; - private transient OmniProjectOperatorFactory projectOperatorFactory; private transient OmniOperator projectOperator; private transient boolean reduceSinkCanReplaceKey; private transient VecWrapper[] vecWrappers; - private transient Iterator output; - /** * Kryo ctor. */ @@ -316,7 +312,7 @@ public class OmniReduceSinkOperator extends TerminalOperator buckColIdxInKey = conf.getPartitionCols().size(); } - tag = conf.getTag(); + int tag = conf.getTag(); tagByte[0] = (byte) tag; skipTag = conf.getSkipTag(); if (LOG.isInfoEnabled()) { @@ -395,9 +391,9 @@ public class OmniReduceSinkOperator extends TerminalOperator } } } - this.projectOperatorFactory = new OmniProjectOperatorFactory(expressions, inputTypes, 1, + OmniProjectOperatorFactory projectOperatorFactory = new OmniProjectOperatorFactory(expressions, inputTypes, 1, new OperatorConfig(SpillConfig.NONE, new OverflowConfig(), true)); - this.projectOperator = this.projectOperatorFactory.createOperator(); + this.projectOperator = projectOperatorFactory.createOperator(); for (int i = 0; i < conf.getKeyCols().size(); i++) { keyFieldId[i] = i; } @@ -454,7 +450,7 @@ public class OmniReduceSinkOperator extends TerminalOperator VecBatch input = (VecBatch) row; if (needProject) { this.projectOperator.addInput(input); - output = this.projectOperator.getOutput(); + Iterator output = this.projectOperator.getOutput(); while (output.hasNext()) { processVecbatch(output.next(), tag); } diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniVectorWithSortOperator.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniVectorWithSortOperator.java index cb89f913065a915aa90ee6f001d1811b8127a19f..5d7eb491b24b874d719bb90f973a0e1894517507 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniVectorWithSortOperator.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniVectorWithSortOperator.java @@ -180,6 +180,12 @@ public class OmniVectorWithSortOperator extends OmniHiveOperator return; } forward(outputs[posBigTable].next(), posBigTable); + for (OmniSortOperatorFactory sortOperatorFactory : sortOperatorFactories) { + sortOperatorFactory.close(); + } + for (OmniOperator sortOperator : sortOperators) { + sortOperator.close(); + } super.close(abort); } diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniVectorizedMergeJoinOperator.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniVectorizedMergeJoinOperator.java deleted file mode 100644 index ace4531ecfd056cf88ff2ee1eb30f86f0d1b6b72..0000000000000000000000000000000000000000 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniVectorizedMergeJoinOperator.java +++ /dev/null @@ -1,115 +0,0 @@ - -package com.huawei.boostkit.hive; - -import nova.hetu.omniruntime.operator.OmniOperator; -import nova.hetu.omniruntime.type.DataType; -import nova.hetu.omniruntime.vector.VecBatch; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.ql.CompilationOpContext; -import org.apache.hadoop.hive.ql.exec.MapredContext; -import org.apache.hadoop.hive.ql.exec.tez.TezContext; -import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.plan.CommonMergeJoinDesc; - -import java.util.Queue; - -public class OmniVectorizedMergeJoinOperator extends OmniJoinOperator { - private int posBigTable; - private OmniVectorWithSortOperator omniVectorOperator; - - /** Kryo ctor. */ - protected OmniVectorizedMergeJoinOperator() { - super(); - } - - public OmniVectorizedMergeJoinOperator(CompilationOpContext ctx) { - super(ctx); - } - - public OmniVectorizedMergeJoinOperator(CompilationOpContext ctx, CommonMergeJoinDesc commonMergeJoinDesc) { - super(ctx); - this.conf = new OmniMergeJoinDesc(commonMergeJoinDesc); - this.posBigTable = commonMergeJoinDesc.getPosBigTable(); - } - - @Override - // If mergeJoinOperator has 3 tables, first join table0 and table1, and output - // all columns of table0 and table1. - // Then use the output to join table2, and output required columns. - protected void initializeOp(Configuration hconf) throws HiveException { - super.initializeOp(hconf); - sources = ((TezContext) MapredContext.get()).getRecordSources(); - omniVectorOperator = (OmniVectorWithSortOperator) parentOperators.get(0); - } - - @Override - public void process(Object row, int tag) throws HiveException { - VecBatch input = (VecBatch) row; - if (flowControlCode[flowControlCode.length - 1] == SCAN_FINISH) { - input.releaseAllVectors(); - input.close(); - setDone(true); - return; - } - if (tag == 0 && flowControlCode[0] != SCAN_FINISH) { - streamData[0].offer(input); - } else if (tag >= 1 && flowControlCode[tag - 1] != SCAN_FINISH) { - bufferData[tag - 1].offer(input); - } else { - input.releaseAllVectors(); - input.close(); - return; - } - if (tag == posBigTable) { - processOmni(0, 1); - for (int opIndex = 1; opIndex < streamFactories.length; opIndex++) { - if (!streamData[opIndex].isEmpty()) { - processOmni(opIndex, opIndex + 1); - } - } - } - } - - @Override - protected void processOmniSmj(int opIndex, int dataIndex, Queue[] data, OmniOperator[] operators, - int controlCode, DataType[][] types) throws HiveException { - if (!data[opIndex].isEmpty()) { - while (flowControlCode[opIndex] == controlCode && resCode[opIndex] == RES_INIT - && !data[opIndex].isEmpty()) { - setStatus(operators[opIndex].addInput(data[opIndex].poll()), opIndex); - } - return; - } - if (opIndex == dataIndex && opIndex > 0 && flowControlCode[opIndex - 1] == SCAN_FINISH) { - setStatus(operators[opIndex].addInput(createEofVecBatch(types[opIndex])), opIndex); - return; - } - - if (omniVectorOperator.outputs[dataIndex].hasNext()) { - while (flowControlCode[opIndex] == controlCode && resCode[opIndex] == RES_INIT - && omniVectorOperator.outputs[dataIndex].hasNext()) { - omniVectorOperator.pushRecord(dataIndex); - if (!data[opIndex].isEmpty()) { - setStatus(operators[opIndex].addInput(data[opIndex].poll()), opIndex); - } - } - } else if (!omniVectorOperator.outputs[dataIndex].hasNext()) { - setStatus(operators[opIndex].addInput(createEofVecBatch(types[opIndex])), opIndex); - } - } - - public int getPosBigTable() { - return posBigTable; - } - - @Override - public void startGroup() throws HiveException { - return; - } - - @Override - public void endGroup() throws HiveException { - return; - } -} \ No newline at end of file diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniVectorizedVectorOperator.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniVectorizedVectorOperator.java index 32900f3501a972f9c68e73ffd64fd68c23ca6a81..ca1b92ee6b3aa287fe6846ff5c53c94f4b9b5731 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniVectorizedVectorOperator.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniVectorizedVectorOperator.java @@ -112,12 +112,7 @@ public class OmniVectorizedVectorOperator extends OmniHiveOperator SUPPORT_BRIDGE_UDF = new ArrayList() { + public static final List SUPPORT_BRIDGE_UDF = new ArrayList() { { add("UDFToDouble"); add("UDFToInteger"); @@ -31,31 +31,38 @@ public class BridgeExpressionProcessor implements ExpressionProcessor { @Override public BaseExpression process(ExprNodeGenericFuncDesc node, String operator, ObjectInspector inspector) { BaseExpression leaf = null; - if (node.getGenericUDF().getUdfName().equals("UDFToString") - && node.getChildren().get(0).getTypeInfo() instanceof BaseCharTypeInfo) { - leaf = createNode(node.getChildren().get(0), inspector); - } else if (node.getGenericUDF().getUdfName().equals("like")) { - LikeExpressionProcessor likeExpressionProcessor = new LikeExpressionProcessor(); - leaf = likeExpressionProcessor.process(node, operator, inspector); - } else if (node.getGenericUDF().getUdfName().equals("substr")) { - if (simplify) { - SimplifySubstrExpressionProcessor simplifySubstrExpressionProcessor = - new SimplifySubstrExpressionProcessor(); - leaf = simplifySubstrExpressionProcessor.process(node, operator, inspector); - } else { - SubstrExpressionProcessor substrExpressionProcessor = new SubstrExpressionProcessor(); - leaf = substrExpressionProcessor.process(node, operator, inspector); - } - } else if (node.getGenericUDF().getUdfName().equals("UDFToDouble") - || node.getGenericUDF().getUdfName().equals("UDFToInteger") - || node.getGenericUDF().getUdfName().equals("UDFToLong") - || node.getGenericUDF().getUdfName().equals("UDFToShort") - || node.getGenericUDF().getUdfName().equals("UDFToByte") - || node.getGenericUDF().getUdfName().equals("UDFToBoolean")) { - leaf = createNode(node.getChildren().get(0), inspector); - CastFunctionExpression functionExpression = new CastFunctionExpression( - TypeUtils.convertHiveTypeToOmniType(node.getTypeInfo()), null, null, null); - return ExpressionUtils.optimizeCast(leaf, functionExpression); + switch (node.getGenericUDF().getUdfName()) { + case "UDFToString": + if (node.getChildren().get(0).getTypeInfo() instanceof BaseCharTypeInfo) { + leaf = createNode(node.getChildren().get(0), inspector); + } + break; + case "like": + LikeExpressionProcessor likeExpressionProcessor = new LikeExpressionProcessor(); + leaf = likeExpressionProcessor.process(node, operator, inspector); + break; + case "substr": + if (simplify) { + SimplifySubstrExpressionProcessor simplifySubstrExpressionProcessor = + new SimplifySubstrExpressionProcessor(); + leaf = simplifySubstrExpressionProcessor.process(node, operator, inspector); + } else { + SubstrExpressionProcessor substrExpressionProcessor = new SubstrExpressionProcessor(); + leaf = substrExpressionProcessor.process(node, operator, inspector); + } + break; + case "UDFToDouble": + case "UDFToInteger": + case "UDFToLong": + case "UDFToShort": + case "UDFToByte": + case "UDFToBoolean": + leaf = createNode(node.getChildren().get(0), inspector); + CastFunctionExpression functionExpression = new CastFunctionExpression( + TypeUtils.convertHiveTypeToOmniType(node.getTypeInfo()), null, null, null); + return ExpressionUtils.optimizeCast(leaf, functionExpression); + default: + break; } return leaf; } diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/reader/VecBatchWrapper.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/reader/VecBatchWrapper.java index fad0bca9bf53ca57cd970bf77f33e89df15ad8cc..d1033229f550b44dcf89826c2e801cebd79a7930 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/reader/VecBatchWrapper.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/reader/VecBatchWrapper.java @@ -25,10 +25,8 @@ public class VecBatchWrapper extends VectorizedRowBatch { } public void write(DataOutput var1) throws IOException { - return; } public void readFields(DataInput var1) throws IOException { - return; } } diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/shuffle/FixedWidthColumnSerDe.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/shuffle/FixedWidthColumnSerDe.java index da3ecb219c9d776ce13ec6b393a14f7a74dad243..eb17143006c7ad0c37591f702e5b37b4eb6ea283 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/shuffle/FixedWidthColumnSerDe.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/shuffle/FixedWidthColumnSerDe.java @@ -2,7 +2,6 @@ package com.huawei.boostkit.hive.shuffle; public class FixedWidthColumnSerDe implements ColumnSerDe { private int columnTypeLen; - private int length; private static byte[] EMPTY = new byte[16]; public FixedWidthColumnSerDe(int columnTypeLen) { @@ -35,7 +34,7 @@ public class FixedWidthColumnSerDe implements ColumnSerDe { return offset; } vecSerdeBody.isNull = 0; - length = bytes[offset]; + int length = bytes[offset]; ++offset; System.arraycopy(bytes, offset, vecSerdeBody.value, 0, length); System.arraycopy(EMPTY, 0, vecSerdeBody.value, length, columnTypeLen - length); diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/shuffle/OmniKeySortableSerDe.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/shuffle/OmniKeySortableSerDe.java deleted file mode 100644 index 93fc2f1117ab6b03ec5ec9665376817f791f0336..0000000000000000000000000000000000000000 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/shuffle/OmniKeySortableSerDe.java +++ /dev/null @@ -1,970 +0,0 @@ -package com.huawei.boostkit.hive.shuffle; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.common.type.Date; -import org.apache.hadoop.hive.common.type.HiveDecimal; -import org.apache.hadoop.hive.common.type.HiveIntervalDayTime; -import org.apache.hadoop.hive.common.type.HiveIntervalYearMonth; -import org.apache.hadoop.hive.serde.serdeConstants; -import org.apache.hadoop.hive.serde2.AbstractSerDe; -import org.apache.hadoop.hive.serde2.ByteStream; -import org.apache.hadoop.hive.serde2.SerDeException; -import org.apache.hadoop.hive.serde2.SerDeSpec; -import org.apache.hadoop.hive.serde2.SerDeStats; -import org.apache.hadoop.hive.serde2.SerDeUtils; -import org.apache.hadoop.hive.serde2.binarysortable.InputByteBuffer; -import org.apache.hadoop.hive.serde2.io.ByteWritable; -import org.apache.hadoop.hive.serde2.io.DateWritableV2; -import org.apache.hadoop.hive.serde2.io.DoubleWritable; -import org.apache.hadoop.hive.serde2.io.HiveCharWritable; -import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; -import org.apache.hadoop.hive.serde2.io.HiveIntervalDayTimeWritable; -import org.apache.hadoop.hive.serde2.io.HiveIntervalYearMonthWritable; -import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable; -import org.apache.hadoop.hive.serde2.io.ShortWritable; -import org.apache.hadoop.hive.serde2.io.TimestampLocalTZWritable; -import org.apache.hadoop.hive.serde2.io.TimestampWritableV2; -import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.StandardUnionObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.StructField; -import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.ByteObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveIntervalDayTimeObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveIntervalYearMonthObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampLocalTZObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector; -import org.apache.hadoop.hive.serde2.typeinfo.BaseCharTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.TimestampLocalTZTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; -import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo; -import org.apache.hadoop.io.BooleanWritable; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.FloatWritable; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.math.BigInteger; -import java.nio.charset.Charset; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; - -@SerDeSpec(schemaProps = {"columns", "columns.types", "serialization.sort.order", "serialization.sort.order.null"}) -public class OmniKeySortableSerDe extends AbstractSerDe { - public static final Logger LOG = LoggerFactory.getLogger(OmniKeySortableSerDe.class.getName()); - - public static final byte ZERO = (byte) 0; - public static final byte ONE = (byte) 1; - - List columnNames; - List columnTypes; - - TypeInfo rowTypeInfo; - StructObjectInspector rowObjectInspector; - - boolean[] columnSortOrderIsDesc; - byte[] columnNullMarker; - byte[] columnNotNullMarker; - - public static Charset decimalCharSet = Charset.forName("US-ASCII"); - - @Override - public void initialize(Configuration conf, Properties tbl) throws SerDeException { - - // Get column names and sort order - String columnNameProperty = tbl.getProperty(serdeConstants.LIST_COLUMNS); - String columnTypeProperty = tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES); - final String columnNameDelimiter = tbl.containsKey(serdeConstants.COLUMN_NAME_DELIMITER) - ? tbl.getProperty(serdeConstants.COLUMN_NAME_DELIMITER) - : String.valueOf(SerDeUtils.COMMA); - if (columnNameProperty.length() == 0) { - columnNames = new ArrayList(); - } else { - columnNames = Arrays.asList(columnNameProperty.split(columnNameDelimiter)); - } - if (columnTypeProperty.length() == 0) { - columnTypes = new ArrayList(); - } else { - columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty); - } - assert (columnNames.size() == columnTypes.size()); - - // Create row related objects - rowTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes); - rowObjectInspector = (StructObjectInspector) TypeInfoUtils - .getStandardWritableObjectInspectorFromTypeInfo(rowTypeInfo); - row = new ArrayList(columnNames.size()); - for (int i = 0; i < columnNames.size(); i++) { - row.add(null); - } - - // Get the sort order - String columnSortOrder = tbl.getProperty(serdeConstants.SERIALIZATION_SORT_ORDER); - columnSortOrderIsDesc = new boolean[columnNames.size()]; - for (int i = 0; i < columnSortOrderIsDesc.length; i++) { - columnSortOrderIsDesc[i] = (columnSortOrder != null && columnSortOrder.charAt(i) == '-'); - } - - // Null first/last - String columnNullOrder = tbl.getProperty(serdeConstants.SERIALIZATION_NULL_SORT_ORDER); - columnNullMarker = new byte[columnNames.size()]; - columnNotNullMarker = new byte[columnNames.size()]; - for (int i = 0; i < columnSortOrderIsDesc.length; i++) { - if (columnSortOrderIsDesc[i]) { - // Descending - if (columnNullOrder != null && columnNullOrder.charAt(i) == 'a') { - // Null first - columnNullMarker[i] = ONE; - columnNotNullMarker[i] = ZERO; - } else { - // Null last (default for descending order) - columnNullMarker[i] = ZERO; - columnNotNullMarker[i] = ONE; - } - } else { - // Ascending - if (columnNullOrder != null && columnNullOrder.charAt(i) == 'z') { - // Null last - columnNullMarker[i] = ONE; - columnNotNullMarker[i] = ZERO; - } else { - // Null first (default for ascending order) - columnNullMarker[i] = ZERO; - columnNotNullMarker[i] = ONE; - } - } - } - } - - @Override - public Class getSerializedClass() { - return BytesWritable.class; - } - - @Override - public ObjectInspector getObjectInspector() throws SerDeException { - return rowObjectInspector; - } - - ArrayList row; - InputByteBuffer inputByteBuffer = new InputByteBuffer(); - - @Override - public Object deserialize(Writable blob) throws SerDeException { - BytesWritable data = (BytesWritable) blob; - inputByteBuffer.reset(data.getBytes(), 0, data.getLength()); - - try { - for (int i = 0; i < columnNames.size(); i++) { - row.set(i, deserialize(inputByteBuffer, columnTypes.get(i), - columnSortOrderIsDesc[i], columnNullMarker[i], columnNotNullMarker[i], row.get(i))); - } - } catch (IOException e) { - throw new SerDeException(e); - } - - return row; - } - - static Object deserialize(InputByteBuffer buffer, TypeInfo type, boolean invert, byte nullMarker, - byte notNullMarker, Object reuse) throws IOException { - // Is this field a null? - byte isNull = buffer.read(invert); - if (isNull == nullMarker) { - return null; - } - assert (isNull == notNullMarker); - - switch (type.getCategory()) { - case PRIMITIVE: { - PrimitiveTypeInfo ptype = (PrimitiveTypeInfo) type; - switch (ptype.getPrimitiveCategory()) { - case VOID: { - return null; - } - case BOOLEAN: { - BooleanWritable r = reuse == null ? new BooleanWritable() : (BooleanWritable) reuse; - byte b = buffer.read(invert); - assert (b == 1 || b == 2); - r.set(b == 2); - return r; - } - case BYTE: { - ByteWritable r = reuse == null ? new ByteWritable() : (ByteWritable) reuse; - r.set((byte) (buffer.read(invert) ^ 0x80)); - return r; - } - case SHORT: { - ShortWritable r = reuse == null ? new ShortWritable() : (ShortWritable) reuse; - int v = buffer.read(invert) ^ 0x80; - v = (v << 8) + (buffer.read(invert) & 0xff); - r.set((short) v); - return r; - } - case INT: { - IntWritable r = reuse == null ? new IntWritable() : (IntWritable) reuse; - r.set(deserializeInt(buffer, invert)); - return r; - } - case LONG: { - LongWritable r = reuse == null ? new LongWritable() : (LongWritable) reuse; - r.set(deserializeLong(buffer, invert)); - return r; - } - case FLOAT: { - FloatWritable r = reuse == null ? new FloatWritable() : (FloatWritable) reuse; - int v = 0; - for (int i = 0; i < 4; i++) { - v = (v << 8) + (buffer.read(invert) & 0xff); - } - if ((v & (1 << 31)) == 0) { - // negative number, flip all bits - v = ~v; - } else { - // positive number, flip the first bit - v = v ^ (1 << 31); - } - r.set(Float.intBitsToFloat(v)); - return r; - } - case DOUBLE: { - DoubleWritable r = reuse == null ? new DoubleWritable() : (DoubleWritable) reuse; - long v = 0; - for (int i = 0; i < 8; i++) { - v = (v << 8) + (buffer.read(invert) & 0xff); - } - if ((v & (1L << 63)) == 0) { - // negative number, flip all bits - v = ~v; - } else { - // positive number, flip the first bit - v = v ^ (1L << 63); - } - r.set(Double.longBitsToDouble(v)); - return r; - } - case STRING: { - Text r = reuse == null ? new Text() : (Text) reuse; - return deserializeText(buffer, invert, r); - } - - case CHAR: { - HiveCharWritable r = reuse == null ? new HiveCharWritable() : (HiveCharWritable) reuse; - // Use internal text member to read value - deserializeText(buffer, invert, r.getTextValue()); - r.enforceMaxLength(getCharacterMaxLength(type)); - return r; - } - - case VARCHAR: { - HiveVarcharWritable r = reuse == null ? new HiveVarcharWritable() : (HiveVarcharWritable) reuse; - // Use HiveVarchar's internal Text member to read the value. - deserializeText(buffer, invert, r.getTextValue()); - // If we cache helper data for deserialization we could avoid having - // to call getVarcharMaxLength() on every deserialize call. - r.enforceMaxLength(getCharacterMaxLength(type)); - return r; - } - - case BINARY: { - BytesWritable bw = new BytesWritable(); - // Get the actual length first - int start = buffer.tell(); - int length = 0; - do { - byte b = buffer.read(invert); - if (b == 0) { - // end of string - break; - } - if (b == 1) { - // the last char is an escape char. read the actual char - buffer.read(invert); - } - length++; - } while (true); - - if (length == buffer.tell() - start) { - // No escaping happened, so we are already done. - bw.set(buffer.getData(), start, length); - } else { - // Escaping happened, we need to copy byte-by-byte. - // 1. Set the length first. - bw.set(buffer.getData(), start, length); - // 2. Reset the pointer. - buffer.seek(start); - // 3. Copy the data. - byte[] rdata = bw.getBytes(); - for (int i = 0; i < length; i++) { - byte b = buffer.read(invert); - if (b == 1) { - // The last char is an escape char, read the actual char. - // The serialization format escape \0 to \1, and \1 to \2, - // to make sure the string is null-terminated. - b = (byte) (buffer.read(invert) - 1); - } - rdata[i] = b; - } - // 4. Read the null terminator. - byte b = buffer.read(invert); - assert (b == 0); - } - return bw; - } - - case DATE: { - DateWritableV2 d = reuse == null ? new DateWritableV2() : (DateWritableV2) reuse; - d.set(deserializeInt(buffer, invert)); - return d; - } - - case TIMESTAMP: - TimestampWritableV2 t = (reuse == null - ? new TimestampWritableV2() - : (TimestampWritableV2) reuse); - byte[] bytes = new byte[TimestampWritableV2.BINARY_SORTABLE_LENGTH]; - - for (int i = 0; i < bytes.length; i++) { - bytes[i] = buffer.read(invert); - } - t.setBinarySortable(bytes, 0); - return t; - case TIMESTAMPLOCALTZ: - TimestampLocalTZWritable tstz = (reuse == null - ? new TimestampLocalTZWritable() - : (TimestampLocalTZWritable) reuse); - byte[] data = new byte[TimestampLocalTZWritable.BINARY_SORTABLE_LENGTH]; - for (int i = 0; i < data.length; i++) { - data[i] = buffer.read(invert); - } - // Across MR process boundary tz is normalized and stored in type - // and is not carried in data for each row. - tstz.fromBinarySortable(data, 0, ((TimestampLocalTZTypeInfo) type).timeZone()); - return tstz; - case INTERVAL_YEAR_MONTH: { - HiveIntervalYearMonthWritable i = reuse == null - ? new HiveIntervalYearMonthWritable() - : (HiveIntervalYearMonthWritable) reuse; - i.set(deserializeInt(buffer, invert)); - return i; - } - - case INTERVAL_DAY_TIME: { - HiveIntervalDayTimeWritable i = reuse == null - ? new HiveIntervalDayTimeWritable() - : (HiveIntervalDayTimeWritable) reuse; - long totalSecs = deserializeLong(buffer, invert); - int nanos = deserializeInt(buffer, invert); - i.set(totalSecs, nanos); - return i; - } - - case DECIMAL: { - // See serialization of decimal for explanation (below) - HiveDecimalWritable bdw = (reuse == null - ? new HiveDecimalWritable() - : (HiveDecimalWritable) reuse); - - int b = buffer.read(invert) - 1; - assert (b == 1 || b == -1 || b == 0); - boolean positive = b != -1; - - int factor = buffer.read(invert) ^ 0x80; - for (int i = 0; i < 3; i++) { - factor = (factor << 8) + (buffer.read(invert) & 0xff); - } - - if (!positive) { - factor = -factor; - } - - int start = buffer.tell(); - int length = 0; - - do { - b = buffer.read(positive ? invert : !invert); - assert (b != 1); - - if (b == 0) { - // end of digits - break; - } - - length++; - } while (true); - - final byte[] decimalBuffer = new byte[length]; - - buffer.seek(start); - for (int i = 0; i < length; ++i) { - decimalBuffer[i] = buffer.read(positive ? invert : !invert); - } - - // read the null byte again - buffer.read(positive ? invert : !invert); - - String digits = new String(decimalBuffer, 0, length, decimalCharSet); - BigInteger bi = new BigInteger(digits); - HiveDecimal bd = HiveDecimal.create(bi).scaleByPowerOfTen(factor - length); - - if (!positive) { - bd = bd.negate(); - } - - bdw.set(bd); - return bdw; - } - - default: { - throw new RuntimeException("Unrecognized type: " + ptype.getPrimitiveCategory()); - } - } - } - - case LIST: { - ListTypeInfo ltype = (ListTypeInfo) type; - TypeInfo etype = ltype.getListElementTypeInfo(); - - // Create the list if needed - ArrayList r = reuse == null ? new ArrayList() : (ArrayList) reuse; - - // Read the list - int size = 0; - while (true) { - int more = buffer.read(invert); - if (more == 0) { - // \0 to terminate - break; - } - // \1 followed by each element - assert (more == 1); - if (size == r.size()) { - r.add(null); - } - r.set(size, deserialize(buffer, etype, invert, nullMarker, notNullMarker, r.get(size))); - size++; - } - // Remove additional elements if the list is reused - while (r.size() > size) { - r.remove(r.size() - 1); - } - return r; - } - case MAP: { - MapTypeInfo mtype = (MapTypeInfo) type; - TypeInfo ktype = mtype.getMapKeyTypeInfo(); - TypeInfo vtype = mtype.getMapValueTypeInfo(); - - // Create the map if needed - Map r; - if (reuse == null || reuse.getClass() != LinkedHashMap.class) { - r = new LinkedHashMap(); - } else { - r = (Map) reuse; - r.clear(); - } - - while (true) { - int more = buffer.read(invert); - if (more == 0) { - // \0 to terminate - break; - } - // \1 followed by each key and then each value - assert (more == 1); - Object k = deserialize(buffer, ktype, invert, nullMarker, notNullMarker, null); - Object v = deserialize(buffer, vtype, invert, nullMarker, notNullMarker, null); - r.put(k, v); - } - return r; - } - case STRUCT: { - StructTypeInfo stype = (StructTypeInfo) type; - List fieldTypes = stype.getAllStructFieldTypeInfos(); - int size = fieldTypes.size(); - // Create the struct if needed - ArrayList r = reuse == null ? new ArrayList(size) : (ArrayList) reuse; - assert (r.size() <= size); - // Set the size of the struct - while (r.size() < size) { - r.add(null); - } - // Read one field by one field - for (int eid = 0; eid < size; eid++) { - r.set(eid, deserialize(buffer, fieldTypes.get(eid), invert, nullMarker, notNullMarker, r.get(eid))); - } - return r; - } - case UNION: { - UnionTypeInfo utype = (UnionTypeInfo) type; - StandardUnionObjectInspector.StandardUnion r = reuse == null - ? new StandardUnionObjectInspector.StandardUnion() - : (StandardUnionObjectInspector.StandardUnion) reuse; - // Read the tag - byte tag = buffer.read(invert); - r.setTag(tag); - r.setObject(deserialize(buffer, utype.getAllUnionObjectTypeInfos().get(tag), invert, nullMarker, - notNullMarker, null)); - return r; - } - default: { - throw new RuntimeException("Unrecognized type: " + type.getCategory()); - } - } - } - - private static int deserializeInt(InputByteBuffer buffer, boolean invert) throws IOException { - int v = buffer.read(invert) ^ 0x80; - for (int i = 0; i < 3; i++) { - v = (v << 8) + (buffer.read(invert) & 0xff); - } - return v; - } - - private static long deserializeLong(InputByteBuffer buffer, boolean invert) throws IOException { - long v = buffer.read(invert) ^ 0x80; - for (int i = 0; i < 7; i++) { - v = (v << 8) + (buffer.read(invert) & 0xff); - } - return v; - } - - static int getCharacterMaxLength(TypeInfo type) { - return ((BaseCharTypeInfo) type).getLength(); - } - - public static Text deserializeText(InputByteBuffer buffer, boolean invert, Text r) throws IOException { - // Get the actual length first - int start = buffer.tell(); - int length = 0; - do { - byte b = buffer.read(invert); - if (b == 0) { - // end of string - break; - } - if (b == 1) { - // the last char is an escape char. read the actual char - buffer.read(invert); - } - length++; - } while (true); - - if (length == buffer.tell() - start) { - // No escaping happened, so we are already done. - r.set(buffer.getData(), start, length); - } else { - // Escaping happened, we need to copy byte-by-byte. - // 1. Set the length first. - r.set(buffer.getData(), start, length); - // 2. Reset the pointer. - buffer.seek(start); - // 3. Copy the data. - byte[] rdata = r.getBytes(); - for (int i = 0; i < length; i++) { - byte b = buffer.read(invert); - if (b == 1) { - // The last char is an escape char, read the actual char. - // The serialization format escape \0 to \1, and \1 to \2, - // to make sure the string is null-terminated. - b = (byte) (buffer.read(invert) - 1); - } - rdata[i] = b; - } - // 4. Read the null terminator. - byte b = buffer.read(invert); - assert (b == 0); - } - return r; - } - - BytesWritable serializeBytesWritable = new BytesWritable(); - ByteStream.Output output = new ByteStream.Output(); - - @Override - public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException { - output.reset(); - StructObjectInspector soi = (StructObjectInspector) objInspector; - List fields = soi.getAllStructFieldRefs(); - - for (int i = 0; i < columnNames.size(); i++) { - serialize(output, soi.getStructFieldData(obj, fields.get(i)), fields.get(i).getFieldObjectInspector(), - columnSortOrderIsDesc[i], columnNullMarker[i], columnNotNullMarker[i]); - } - - serializeBytesWritable.set(output.getData(), 0, output.getLength()); - return serializeBytesWritable; - } - - public static void writeByte(ByteStream.RandomAccessOutput buffer, byte b, boolean invert) { - if (invert) { - b = (byte) (0xff ^ b); - } - buffer.write(b); - } - - static void serialize(ByteStream.Output buffer, Object o, ObjectInspector oi, boolean invert, byte nullMarker, - byte notNullMarker) throws SerDeException { - // Is this field a null? - if (o == null) { - writeByte(buffer, nullMarker, invert); - return; - } - // This field is not a null. - writeByte(buffer, notNullMarker, invert); - - switch (oi.getCategory()) { - case PRIMITIVE: { - PrimitiveObjectInspector poi = (PrimitiveObjectInspector) oi; - switch (poi.getPrimitiveCategory()) { - case VOID: { - return; - } - case BOOLEAN: { - boolean v = ((BooleanObjectInspector) poi).get(o); - writeByte(buffer, (byte) (v ? 2 : 1), invert); - return; - } - case BYTE: { - ByteObjectInspector boi = (ByteObjectInspector) poi; - byte v = boi.get(o); - writeByte(buffer, (byte) (v ^ 0x80), invert); - return; - } - case SHORT: { - serializeShort(buffer, (Short) o, invert); - return; - } - case INT: { - serializeInt(buffer, (Integer) o, invert); - return; - } - case LONG: { - serializeLong(buffer, (Long) o, invert); - return; - } - case FLOAT: { - serializeFloat(buffer, (Float) o, invert); - return; - } - case DOUBLE: { - serializeDouble(buffer, (Double) o, invert); - return; - } - case STRING: { - StringObjectInspector soi = (StringObjectInspector) poi; - Text t = soi.getPrimitiveWritableObject(o); - serializeBytes(buffer, t.getBytes(), t.getLength(), invert); - return; - } - - case CHAR: - case VARCHAR: { - serializeBytes(buffer, ((Text) o).getBytes(), ((Text) o).getLength(), invert); - return; - } - - case BINARY: { - BinaryObjectInspector baoi = (BinaryObjectInspector) poi; - BytesWritable ba = baoi.getPrimitiveWritableObject(o); - byte[] toSer = new byte[ba.getLength()]; - System.arraycopy(ba.getBytes(), 0, toSer, 0, ba.getLength()); - serializeBytes(buffer, toSer, ba.getLength(), invert); - return; - } - case DATE: { - int v = Date.ofEpochDay((Integer) o).toEpochDay(); - serializeInt(buffer, v, invert); - return; - } - case TIMESTAMP: { - TimestampObjectInspector toi = (TimestampObjectInspector) poi; - TimestampWritableV2 t = toi.getPrimitiveWritableObject(o); - serializeTimestampWritable(buffer, t, invert); - return; - } - case TIMESTAMPLOCALTZ: { - TimestampLocalTZObjectInspector toi = (TimestampLocalTZObjectInspector) poi; - TimestampLocalTZWritable t = toi.getPrimitiveWritableObject(o); - serializeTimestampTZWritable(buffer, t, invert); - return; - } - case INTERVAL_YEAR_MONTH: { - HiveIntervalYearMonthObjectInspector ioi = (HiveIntervalYearMonthObjectInspector) poi; - HiveIntervalYearMonth intervalYearMonth = ioi.getPrimitiveJavaObject(o); - serializeHiveIntervalYearMonth(buffer, intervalYearMonth, invert); - return; - } - case INTERVAL_DAY_TIME: { - HiveIntervalDayTimeObjectInspector ioi = (HiveIntervalDayTimeObjectInspector) poi; - HiveIntervalDayTime intervalDayTime = ioi.getPrimitiveJavaObject(o); - serializeHiveIntervalDayTime(buffer, intervalDayTime, invert); - return; - } - case DECIMAL: { - serializeHiveDecimal(buffer, (HiveDecimal) o, invert); - return; - } - - default: { - throw new RuntimeException("Unrecognized type: " + poi.getPrimitiveCategory()); - } - } - } - case LIST: { - ListObjectInspector loi = (ListObjectInspector) oi; - ObjectInspector eoi = loi.getListElementObjectInspector(); - - // \1 followed by each element - int size = loi.getListLength(o); - for (int eid = 0; eid < size; eid++) { - writeByte(buffer, (byte) 1, invert); - serialize(buffer, loi.getListElement(o, eid), eoi, invert, nullMarker, notNullMarker); - } - // and \0 to terminate - writeByte(buffer, (byte) 0, invert); - return; - } - case MAP: { - MapObjectInspector moi = (MapObjectInspector) oi; - ObjectInspector koi = moi.getMapKeyObjectInspector(); - ObjectInspector voi = moi.getMapValueObjectInspector(); - - // \1 followed by each key and then each value - Map map = moi.getMap(o); - for (Map.Entry entry : map.entrySet()) { - writeByte(buffer, (byte) 1, invert); - serialize(buffer, entry.getKey(), koi, invert, nullMarker, notNullMarker); - serialize(buffer, entry.getValue(), voi, invert, nullMarker, notNullMarker); - } - // and \0 to terminate - writeByte(buffer, (byte) 0, invert); - return; - } - case STRUCT: { - StructObjectInspector soi = (StructObjectInspector) oi; - List fields = soi.getAllStructFieldRefs(); - - for (int i = 0; i < fields.size(); i++) { - serialize(buffer, soi.getStructFieldData(o, fields.get(i)), fields.get(i).getFieldObjectInspector(), - invert, nullMarker, notNullMarker); - } - return; - } - case UNION: { - UnionObjectInspector uoi = (UnionObjectInspector) oi; - byte tag = uoi.getTag(o); - writeByte(buffer, tag, invert); - serialize(buffer, uoi.getField(o), uoi.getObjectInspectors().get(tag), invert, nullMarker, - notNullMarker); - return; - } - default: { - throw new RuntimeException("Unrecognized type: " + oi.getCategory()); - } - } - - } - - public static void serializeBytes(ByteStream.Output buffer, byte[] data, int length, boolean invert) { - for (int i = 0; i < length; i++) { - if (data[i] == 0 || data[i] == 1) { - writeByte(buffer, (byte) 1, invert); - writeByte(buffer, (byte) (data[i] + 1), invert); - } else { - writeByte(buffer, data[i], invert); - } - } - writeByte(buffer, (byte) 0, invert); - } - - public static void serializeBytes(ByteStream.Output buffer, byte[] data, int offset, int length, boolean invert) { - for (int i = offset; i < offset + length; i++) { - if (data[i] == 0 || data[i] == 1) { - writeByte(buffer, (byte) 1, invert); - writeByte(buffer, (byte) (data[i] + 1), invert); - } else { - writeByte(buffer, data[i], invert); - } - } - writeByte(buffer, (byte) 0, invert); - } - - public static void serializeShort(ByteStream.Output buffer, short v, boolean invert) { - writeByte(buffer, (byte) ((v >> 8) ^ 0x80), invert); - writeByte(buffer, (byte) v, invert); - } - - public static void serializeInt(ByteStream.Output buffer, int v, boolean invert) { - writeByte(buffer, (byte) ((v >> 24) ^ 0x80), invert); - writeByte(buffer, (byte) (v >> 16), invert); - writeByte(buffer, (byte) (v >> 8), invert); - writeByte(buffer, (byte) v, invert); - } - - public static void serializeLong(ByteStream.Output buffer, long v, boolean invert) { - writeByte(buffer, (byte) ((v >> 56) ^ 0x80), invert); - writeByte(buffer, (byte) (v >> 48), invert); - writeByte(buffer, (byte) (v >> 40), invert); - writeByte(buffer, (byte) (v >> 32), invert); - writeByte(buffer, (byte) (v >> 24), invert); - writeByte(buffer, (byte) (v >> 16), invert); - writeByte(buffer, (byte) (v >> 8), invert); - writeByte(buffer, (byte) v, invert); - } - - public static void serializeFloat(ByteStream.Output buffer, float vf, boolean invert) { - int v = Float.floatToIntBits(vf); - if ((v & (1 << 31)) != 0) { - // negative number, flip all bits - v = ~v; - } else { - // positive number, flip the first bit - v = v ^ (1 << 31); - } - writeByte(buffer, (byte) (v >> 24), invert); - writeByte(buffer, (byte) (v >> 16), invert); - writeByte(buffer, (byte) (v >> 8), invert); - writeByte(buffer, (byte) v, invert); - } - - public static void serializeDouble(ByteStream.Output buffer, double vd, boolean invert) { - long v = Double.doubleToLongBits(vd); - if ((v & (1L << 63)) != 0) { - // negative number, flip all bits - v = ~v; - } else { - // positive number, flip the first bit - v = v ^ (1L << 63); - } - writeByte(buffer, (byte) (v >> 56), invert); - writeByte(buffer, (byte) (v >> 48), invert); - writeByte(buffer, (byte) (v >> 40), invert); - writeByte(buffer, (byte) (v >> 32), invert); - writeByte(buffer, (byte) (v >> 24), invert); - writeByte(buffer, (byte) (v >> 16), invert); - writeByte(buffer, (byte) (v >> 8), invert); - writeByte(buffer, (byte) v, invert); - } - - public static void serializeTimestampWritable(ByteStream.Output buffer, TimestampWritableV2 t, boolean invert) { - byte[] data = t.getBinarySortable(); - for (int i = 0; i < data.length; i++) { - writeByte(buffer, data[i], invert); - } - } - - public static void serializeTimestampTZWritable(ByteStream.Output buffer, TimestampLocalTZWritable t, - boolean invert) { - byte[] data = t.toBinarySortable(); - for (byte b : data) { - writeByte(buffer, b, invert); - } - } - - public static void serializeHiveIntervalYearMonth(ByteStream.Output buffer, HiveIntervalYearMonth intervalYearMonth, - boolean invert) { - int totalMonths = intervalYearMonth.getTotalMonths(); - serializeInt(buffer, totalMonths, invert); - } - - public static void serializeHiveIntervalDayTime(ByteStream.Output buffer, HiveIntervalDayTime intervalDayTime, - boolean invert) { - long totalSecs = intervalDayTime.getTotalSeconds(); - int nanos = intervalDayTime.getNanos(); - serializeLong(buffer, totalSecs, invert); - serializeInt(buffer, nanos, invert); - } - - // See comments for next method. - public static void serializeHiveDecimal(ByteStream.Output buffer, HiveDecimal dec, boolean invert) { - byte[] scratchBuffer = new byte[HiveDecimal.SCRATCH_BUFFER_LEN_TO_BYTES]; - serializeHiveDecimal(buffer, dec, invert, scratchBuffer); - } - - /** - * Decimals are encoded in three pieces:Decimals are encoded in three pieces: - *

- * Sign: 1, 2 or 3 for smaller, equal or larger than 0 respectively - * Factor: Number that indicates the amount of digits you have to move - * the decimal point left or right until the resulting number is smaller - * than zero but has something other than 0 as the first digit. - * Digits: which is a string of all the digits in the decimal. If the number - * is negative the binary string will be inverted to get the correct ordering. - *

- * UNDONE: Is this example correct? - * Example: 0.00123 - * Sign is 3 (bigger than 0) - * Factor is -2 (move decimal point 2 positions right) - * Digits are: 123 - * - * @param buffer - * @param dec - * @param invert - * @param scratchBuffer - */ - public static void serializeHiveDecimal(ByteStream.Output buffer, HiveDecimal dec, boolean invert, - byte[] scratchBuffer) { - // Get the sign of the decimal. - int signum = dec.signum(); - - // Get the 10^N power to turn digits into the desired decimal with a possible - // fractional part. - // To be compatible with the OldHiveDecimal version, zero has factor 1. - int factor; - if (signum == 0) { - factor = 1; - } else { - factor = dec.rawPrecision() - dec.scale(); - } - - // To make comparisons work properly, the "factor" gets the decimal's sign, too. - factor = signum == 1 ? factor : -factor; - - // Convert just the decimal digits (no dot, sign, etc) into bytes. - // - // This is much faster than converting the BigInteger value from unscaledValue() which is no - // longer part of the HiveDecimal representation anymore to string, then bytes. - int index = dec.toDigitsOnlyBytes(scratchBuffer); - - /* - * Finally write out the pieces (sign, power, digits) - */ - writeByte(buffer, (byte) (signum + 1), invert); - writeByte(buffer, (byte) ((factor >> 24) ^ 0x80), invert); - writeByte(buffer, (byte) (factor >> 16), invert); - writeByte(buffer, (byte) (factor >> 8), invert); - writeByte(buffer, (byte) factor, invert); - - // The toDigitsOnlyBytes stores digits at the end of the scratch buffer. - serializeBytes(buffer, scratchBuffer, index, scratchBuffer.length - index, signum == -1 ? !invert : invert); - } - - // A HiveDecimalWritable version. - @Override - public SerDeStats getSerDeStats() { - // no support for statistics - return null; - } -} \ No newline at end of file