diff --git a/omnioperator/omniop-native-reader/cpp/src/CMakeLists.txt b/omnioperator/omniop-native-reader/cpp/src/CMakeLists.txt index 346db130b87edc7658b3c5bd1b9acdbc6acb102e..00d9437779628ba501874d975556526199a9930d 100644 --- a/omnioperator/omniop-native-reader/cpp/src/CMakeLists.txt +++ b/omnioperator/omniop-native-reader/cpp/src/CMakeLists.txt @@ -1,18 +1,20 @@ include_directories(SYSTEM "/user/local/include") -set (PROJ_TARGET native_reader) +set(PROJ_TARGET native_reader) -set (SOURCE_FILES +set(SOURCE_FILES jni/OrcColumnarBatchJniWriter.cpp jni/OrcColumnarBatchJniReader.cpp jni/jni_common.cpp jni/ParquetColumnarBatchJniReader.cpp + jni/ParquetColumnarBatchJniWriter.cpp parquet/ParquetReader.cpp parquet/ParquetColumnReader.cpp parquet/ParquetTypedRecordReader.cpp parquet/ParquetDecoder.cpp parquet/ParquetExpression.cpp + parquet/ParquetWriter.cpp common/UriInfo.cc orcfile/OrcFileOverride.cc orcfile/OrcHdfsFileOverride.cc @@ -33,15 +35,15 @@ set (SOURCE_FILES #Find required protobuf package find_package(Protobuf REQUIRED) -if(PROTOBUF_FOUND) +if (PROTOBUF_FOUND) message(STATUS "protobuf library found") -else() +else () message(FATAL_ERROR "protobuf library is needed but cant be found") -endif() +endif () include_directories(${Protobuf_INCLUDE_DIRS}) include_directories(${CMAKE_CURRENT_BINARY_DIR}) -add_library (${PROJ_TARGET} SHARED ${SOURCE_FILES} ${PROTO_SRCS} ${PROTO_HDRS} ${PROTO_SRCS_VB} ${PROTO_HDRS_VB}) +add_library(${PROJ_TARGET} SHARED ${SOURCE_FILES} ${PROTO_SRCS} ${PROTO_HDRS} ${PROTO_SRCS_VB} ${PROTO_HDRS_VB}) find_package(Arrow REQUIRED) find_package(Parquet REQUIRED) @@ -52,17 +54,17 @@ target_include_directories(${PROJ_TARGET} PUBLIC $ENV{JAVA_HOME}/include) target_include_directories(${PROJ_TARGET} PUBLIC $ENV{JAVA_HOME}/include/linux) target_include_directories(${PROJ_TARGET} PUBLIC ${CMAKE_CURRENT_BINARY_DIR}) -target_link_libraries (${PROJ_TARGET} PUBLIC +target_link_libraries(${PROJ_TARGET} PUBLIC Arrow::arrow_shared ArrowDataset::arrow_dataset_shared Parquet::parquet_shared orc boostkit-omniop-vector-1.7.0-aarch64 hdfs - ) +) set_target_properties(${PROJ_TARGET} PROPERTIES - LIBRARY_OUTPUT_DIRECTORY ${root_directory}/releases + LIBRARY_OUTPUT_DIRECTORY ${root_directory}/releases ) install(TARGETS ${PROJ_TARGET} DESTINATION lib) diff --git a/omnioperator/omniop-native-reader/cpp/src/jni/ParquetColumnarBatchJniWriter.cpp b/omnioperator/omniop-native-reader/cpp/src/jni/ParquetColumnarBatchJniWriter.cpp new file mode 100644 index 0000000000000000000000000000000000000000..4e0ffb6d7ee8f36604b07d8c01041ea5a1552bf5 --- /dev/null +++ b/omnioperator/omniop-native-reader/cpp/src/jni/ParquetColumnarBatchJniWriter.cpp @@ -0,0 +1,203 @@ +/** + * Copyright (C) 2024-2024. Huawei Technologies Co., Ltd. All rights reserved. + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "ParquetColumnarBatchJniWriter.h" +#include "jni_common.h" +#include "parquet/ParquetWriter.h" +#include "common/UriInfo.h" +#include "arrow/status.h" +#include +#include + +using namespace omniruntime::writer; +using namespace arrow; + +static constexpr int32_t DECIMAL_PRECISION_INDEX = 0; +static constexpr int32_t DECIMAL_SCALE_INDEX = 1; + +JNIEXPORT void JNICALL Java_com_huawei_boostkit_write_jni_ParquetColumnarBatchJniWriter_initializeWriter( + JNIEnv *env, jobject jObj, jobject jsonObj, jlong writer) +{ + JNI_FUNC_START + jstring uri = (jstring)env->CallObjectMethod(jsonObj, jsonMethodString, env->NewStringUTF("uri")); + const char *uriStr = env->GetStringUTFChars(uri, JNI_FALSE); + std::string uriString(uriStr); + env->ReleaseStringUTFChars(uri, uriStr); + + jstring ugiTemp = (jstring)env->CallObjectMethod(jsonObj, jsonMethodString, env->NewStringUTF("ugi")); + const char *ugi = env->GetStringUTFChars(ugiTemp, JNI_FALSE); + std::string ugiString(ugi); + env->ReleaseStringUTFChars(ugiTemp, ugi); + + jstring schemaTemp = (jstring)env->CallObjectMethod(jsonObj, jsonMethodString, env->NewStringUTF("scheme")); + const char *schema = env->GetStringUTFChars(schemaTemp, JNI_FALSE); + std::string schemaString(schema); + env->ReleaseStringUTFChars(schemaTemp, schema); + + jstring hostTemp = (jstring)env->CallObjectMethod(jsonObj, jsonMethodString, env->NewStringUTF("host")); + const char *host = env->GetStringUTFChars(hostTemp, JNI_FALSE); + std::string hostString(host); + env->ReleaseStringUTFChars(hostTemp, host); + + jstring pathTemp = (jstring)env->CallObjectMethod(jsonObj, jsonMethodString, env->NewStringUTF("path")); + const char *path = env->GetStringUTFChars(pathTemp, JNI_FALSE); + std::string pathString(path); + env->ReleaseStringUTFChars(pathTemp, path); + + jint port = (jint)env->CallIntMethod(jsonObj, jsonMethodInt, env->NewStringUTF("port")); + + UriInfo uriInfo(uriString, schemaString, pathString, hostString, std::to_string(port)); + ParquetWriter *pWriter = (ParquetWriter *)writer; + if (pWriter == nullptr) { + env->ThrowNew(runtimeExceptionClass, "the pWriter is null"); + } + pWriter->InitRecordWriter(uriInfo, ugiString); + JNI_FUNC_END_VOID(runtimeExceptionClass) +} + +JNIEXPORT jlong JNICALL Java_com_huawei_boostkit_write_jni_ParquetColumnarBatchJniWriter_initializeSchema( + JNIEnv *env, jobject JObj, jlong writer, jobjectArray fieldNames, jintArray fieldTypes, + jbooleanArray nullables, jobjectArray decimalParam) +{ + JNI_FUNC_START + auto pWriter = std::make_unique(); + auto fieldTypesPtr = env->GetIntArrayElements(fieldTypes, JNI_FALSE); + auto nullablesPtr = env->GetBooleanArrayElements(nullables, JNI_FALSE); + if (fieldTypesPtr == NULL) { + throw std::runtime_error("Parquet type ids should not be null"); + } + auto schemaLength = (int32_t)env->GetArrayLength(fieldTypes); + FieldVector fieldVector; + for (int i = 0; i < schemaLength; i++) { + jint parquetType = fieldTypesPtr[i]; + jboolean nullable = nullablesPtr[i]; + jstring fieldName = (jstring)env->GetObjectArrayElement(fieldNames, i); + const char *cFieldName = env->GetStringUTFChars(fieldName, nullptr); + std::shared_ptr writeParquetType; + + auto decimalParamArray = (jintArray)env->GetObjectArrayElement(decimalParam, i); + auto decimalParamArrayPtr = env->GetIntArrayElements(decimalParamArray, JNI_FALSE); + auto precision = decimalParamArrayPtr[DECIMAL_PRECISION_INDEX]; + auto scale = decimalParamArrayPtr[DECIMAL_SCALE_INDEX]; + switch (static_cast(parquetType)) { + case Type::type::DECIMAL: + pWriter->precisions.push_back(precision); + pWriter->scales.push_back(scale); + writeParquetType = decimal128(precision, scale); + break; + case Type::type::BOOL: + writeParquetType = arrow::boolean(); + break; + case Type::type::INT16: + writeParquetType = arrow::int16(); + break; + case Type::type::INT32: + writeParquetType = arrow::int32(); + break; + case Type::type::INT64: + writeParquetType = arrow::int64(); + break; + case Type::type::DATE32: + writeParquetType = arrow::date32(); + break; + case Type::type::DATE64: + writeParquetType = arrow::date64(); + break; + case Type::type::DOUBLE: + writeParquetType = arrow::float64(); + break; + case Type::type::STRING: + writeParquetType = arrow::utf8(); + break; + default: + throw std::invalid_argument("Unsupported parquet type: "+std::to_string(parquetType)); + } + auto t = field(cFieldName, writeParquetType, nullable); + fieldVector.emplace_back(t); + env->ReleaseIntArrayElements(decimalParamArray,decimalParamArrayPtr,JNI_ABORT); + env->ReleaseStringUTFChars(fieldName, cFieldName); + } + auto t = std::make_unique(fieldVector); + if (pWriter == nullptr) { + env->ThrowNew(runtimeExceptionClass, "the pWriter is null"); + } + pWriter->schema_ = std::make_shared(fieldVector); + ParquetWriter *pWriterNew= pWriter.release(); + env->ReleaseIntArrayElements(fieldTypes,fieldTypesPtr,JNI_ABORT); + env->ReleaseBooleanArrayElements(nullables,nullablesPtr,JNI_ABORT); + return (jlong)(pWriterNew); + JNI_FUNC_END(runtimeExceptionClass) +} + +JNIEXPORT void JNICALL +Java_com_huawei_boostkit_write_jni_ParquetColumnarBatchJniWriter_write( + JNIEnv *env, jobject jObj, jlong writer, jlongArray vecNativeId, + jintArray omniTypes, jbooleanArray dataColumnsIds, jint numRows) +{ + JNI_FUNC_START + ParquetWriter *pWriter = (ParquetWriter *)writer; + auto vecNativeIdPtr = env->GetLongArrayElements(vecNativeId, JNI_FALSE); + auto colNums = env->GetArrayLength(vecNativeId); + auto omniTypesPtr = env->GetIntArrayElements(omniTypes, JNI_FALSE); + auto dataColumnsIdsPtr = env->GetBooleanArrayElements(dataColumnsIds, JNI_FALSE); + if (pWriter == nullptr) { + env->ThrowNew(runtimeExceptionClass, "the pWriter is null"); + } + pWriter->write(vecNativeIdPtr, colNums, omniTypesPtr, dataColumnsIdsPtr); + env->ReleaseLongArrayElements(vecNativeId, vecNativeIdPtr, 0); + env->ReleaseIntArrayElements(omniTypes, omniTypesPtr, 0); + env->ReleaseBooleanArrayElements(dataColumnsIds, dataColumnsIdsPtr, 0); + JNI_FUNC_END_VOID(runtimeExceptionClass) +} + +JNIEXPORT void JNICALL Java_com_huawei_boostkit_write_jni_ParquetColumnarBatchJniWriter_splitWrite( + JNIEnv *env, jobject jObj, jlong writer, jlongArray vecNativeId, jintArray omniTypes, jbooleanArray dataColumnsIds, + jlong startPos, jlong endPos) +{ + JNI_FUNC_START + auto vecNativeIdPtr = env->GetLongArrayElements(vecNativeId, JNI_FALSE); + auto colNums = env->GetArrayLength(vecNativeId); + auto omniTypesPtr = env->GetIntArrayElements(omniTypes, JNI_FALSE); + auto dataColumnsIdsPtr = env->GetBooleanArrayElements(dataColumnsIds, JNI_FALSE); + auto writeRows = endPos - startPos; + ParquetWriter *pWriter = (ParquetWriter *)writer; + if (pWriter == nullptr) { + env->ThrowNew(runtimeExceptionClass, "the pWriter is null"); + } + pWriter->write(vecNativeIdPtr, colNums, omniTypesPtr, dataColumnsIdsPtr, true, startPos, endPos); + + env->ReleaseLongArrayElements(vecNativeId, vecNativeIdPtr, 0); + env->ReleaseIntArrayElements(omniTypes, omniTypesPtr, 0); + env->ReleaseBooleanArrayElements(dataColumnsIds, dataColumnsIdsPtr, 0); + JNI_FUNC_END_VOID(runtimeExceptionClass) +} + +JNIEXPORT void JNICALL Java_com_huawei_boostkit_write_jni_ParquetColumnarBatchJniWriter_close(JNIEnv *env, jobject jObj, + jlong writer) +{ + JNI_FUNC_START + + ParquetWriter *pWriter = (ParquetWriter *)writer; + if (pWriter == nullptr) { + env->ThrowNew(runtimeExceptionClass, "delete nullptr error for writer"); + } + + delete pWriter; + JNI_FUNC_END_VOID(runtimeExceptionClass) +} diff --git a/omnioperator/omniop-native-reader/cpp/src/jni/ParquetColumnarBatchJniWriter.h b/omnioperator/omniop-native-reader/cpp/src/jni/ParquetColumnarBatchJniWriter.h new file mode 100644 index 0000000000000000000000000000000000000000..8139d51e8c2f9ba5038a8cce46988eb7bb0d69fa --- /dev/null +++ b/omnioperator/omniop-native-reader/cpp/src/jni/ParquetColumnarBatchJniWriter.h @@ -0,0 +1,86 @@ +/** + * Copyright (C) 2024-2024. Huawei Technologies Co., Ltd. All rights reserved. + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef OMNI_RUNTIME_PARQUETCOLUMNARBATCHJNIWRITER_H +#define OMNI_RUNTIME_PARQUETCOLUMNARBATCHJNIWRITER_H + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "common/debug.h" + +#ifdef __cplusplus +extern "C" +{ +#endif + +/* + * Class: com_huawei_boostkit_writer_jni_ParquetColumnarBatchJniWriter + * Method: initializeWriter + * Signature: + */ +JNIEXPORT void JNICALL Java_com_huawei_boostkit_write_jni_ParquetColumnarBatchJniWriter_initializeWriter + (JNIEnv *env, jobject jObj, jobject job, jlong writer); + +/* + * Class: com_huawei_boostkit_writer_jni_ParquetColumnarBatchJniWriter + * Method: initializeSchema + * Signature: + */ +JNIEXPORT jlong JNICALL Java_com_huawei_boostkit_write_jni_ParquetColumnarBatchJniWriter_initializeSchema + (JNIEnv *env, jobject jObj, jlong writer, jobjectArray filedNames, jintArray fieldTypes, + jbooleanArray nullables, jobjectArray decimalParam); + +/* + * Class: com_huawei_boostkit_writer_jni_ParquetColumnarBatchJniWriter + * Method: write + * Signature: + */ +JNIEXPORT void JNICALL Java_com_huawei_boostkit_write_jni_ParquetColumnarBatchJniWriter_write( + JNIEnv *env, jobject jObj, jlong writer, jlongArray vecNativeId, + jintArray omniTypes, jbooleanArray dataColumnsIds, jint numRows); + +/* + * Class: com_huawei_boostkit_writer_jni_ParquetColumnarBatchJniWriter + * Method: splitWrite + * Signature: + */ +JNIEXPORT void JNICALL Java_com_huawei_boostkit_write_jni_ParquetColumnarBatchJniWriter_splitWrite( + JNIEnv *env, jobject jObj, jlong writer, jlongArray vecNativeId, jintArray omniTypes, + jbooleanArray dataColumnsIds, jlong startPos, jlong endPos); + +/* + * Class: com_huawei_boostkit_writer_jni_ParquetColumnarBatchJniWriter + * Method: close + * Signature: + */ +JNIEXPORT void JNICALL Java_com_huawei_boostkit_write_jni_ParquetColumnarBatchJniWriter_close(JNIEnv *env, jobject jObj, + jlong writer); + +#ifdef __cplusplus +} +#endif +#endif diff --git a/omnioperator/omniop-native-reader/cpp/src/parquet/ParquetWriter.cpp b/omnioperator/omniop-native-reader/cpp/src/parquet/ParquetWriter.cpp new file mode 100644 index 0000000000000000000000000000000000000000..05db6d24bf6f4d2bab954e9216fec66e02455913 --- /dev/null +++ b/omnioperator/omniop-native-reader/cpp/src/parquet/ParquetWriter.cpp @@ -0,0 +1,410 @@ +/** + * Copyright (C) 2024-2024. Huawei Technologies Co., Ltd. All rights reserved. + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "ParquetWriter.h" +#include "ParquetReader.h" +#include "arrow/array/array_base.h" +#include "arrow/array/array_binary.h" +#include "arrow/array/array_primitive.h" +#include "arrow/array/data.h" +#include +#include +#include +#include "arrow/util/bitmap.h" +#include "arrow/chunked_array.h" +#include "arrow/buffer_builder.h" +#include "arrow/table.h" +#include "arrowadapter/FileSystemAdapter.h" +#include "common/UriInfo.h" +#include "jni/jni_common.h" +#include "parquet/arrow/reader.h" +#include "parquet/exception.h" +#include "parquet/properties.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace arrow; +using namespace arrow::internal; +using namespace parquet::arrow; +using namespace omniruntime::writer; +using namespace omniruntime::reader; + +static std::mutex mutex_; + +namespace omniruntime::writer +{ + + arrow::Status ParquetWriter::InitRecordWriter(UriInfo &uri, std::string &ugi) + { + parquet::WriterProperties::Builder writer_properties; + parquet::ArrowWriterProperties::Builder arrow_writer_properties; + + arrow::Status result; + mutex_.lock(); + Filesystem *fs = GetFileSystemPtr(uri, ugi, result); + mutex_.unlock(); + if (fs == nullptr || fs->filesys_ptr == nullptr) { + return arrow::Status::IOError(result); + } + + std::string uriPath = uri.ToString(); + std::filesystem::path path(std::string(uri.Path())); + auto res = std::filesystem::create_directories(path.parent_path()); + if (!res) { + throw std::runtime_error("Create local directories fail"); + } + std::shared_ptr outputStream; + ARROW_ASSIGN_OR_RAISE(outputStream, fs->filesys_ptr->OpenOutputStream(path)); + + writer_properties.disable_dictionary(); + auto fileWriterResult = FileWriter::Open( + *schema_, arrow::default_memory_pool(), outputStream, + writer_properties.build(), parquet::default_arrow_writer_properties()); + if (!fileWriterResult.ok()) { + std::cerr<<"Error opening file writer: "< buildBooleanChunk(DataTypeId typeId, BaseVector *baseVector, + bool isSplitWrite = false, long startPos = 0, + long endPos = 0) + { + using T = typename NativeType::type; + auto vector = (Vector *)baseVector; + + if (!isSplitWrite) { + startPos = 0; + endPos = vector->GetSize(); + } + + int64_t vectorSize = endPos - startPos; + bool values[vectorSize]; + int64_t index = 0; + auto bitmapBuffer = AllocateBitmap(vectorSize).ValueOrDie(); + arrow::internal::Bitmap bitmap(bitmapBuffer, 0, vectorSize); + bitmap.SetBitsTo(true); + + if (vector->HasNull()) { + for (long j = startPos; j < endPos; j++) { + if (vector->IsNull(j)) { + bitmap.SetBitTo(index, false); + } else if(isSplitWrite) { + values[index] = vector->GetValue(j); + } + index++; + } + } else if (isSplitWrite) { + for (long j = startPos; j < endPos; j++) { + values[index] = vector->GetValue(j); + index++; + } + } + + TypedBufferBuilder builder; + builder.Resize(vectorSize); + + builder.Append(reinterpret_cast(isSplitWrite?values:vector->GetValues()), vectorSize); + auto maybe_buffer = builder.Finish(); + std::shared_ptr databuffer = *maybe_buffer; + + std::vector> buffers; + buffers.emplace_back(bitmapBuffer); + buffers.emplace_back(databuffer); + + auto booleanType = std::make_shared(); + auto arrayData = arrow::ArrayData::Make(booleanType, vectorSize, buffers); + + std::vector> arrayVector; + auto booleanArray = std::make_shared(arrayData); + arrayVector.emplace_back(booleanArray); + + return arrow::ChunkedArray::Make(arrayVector, booleanType).ValueOrDie(); + } + + template + std::shared_ptr<::arrow::ChunkedArray> buildChunk(BaseVector *baseVector, + bool isSplitWrite = false, long startPos = 0, + long endPos = 0) + { + using T=typename NativeType::type; + auto vector =(Vector *)baseVector; + if (!isSplitWrite) { + startPos = 0; + endPos = vector->GetSize(); + } + int64_t vectorSize = endPos - startPos; + ChunkType values[vectorSize]; + int64_t index = 0; + + auto bitmapBuffer = AllocateBitmap(vectorSize).ValueOrDie(); + arrow::internal::Bitmap bitmap(bitmapBuffer, 0, vectorSize); + bitmap.SetBitsTo(true); + if (vector->HasNull()) { + for (long j = startPos; j < endPos; j++) { + if (vector->IsNull(j)) { + bitmap.SetBitTo(index, false); + } else if (isSplitWrite) { + values[index] = vector->GetValue(j); + } + index++; + } + } else if (isSplitWrite) { + for (long j = startPos; j < endPos; j++) { + values[index] = vector->GetValue(j); + index++; + } + } + + TypedBufferBuilder builder; + builder.Resize(vectorSize); + builder.Append(isSplitWrite?values:vector->GetValues(), vectorSize); + auto dataBuffer = *builder.Finish(); + std::vector> buffers; + buffers.emplace_back(bitmapBuffer); + buffers.emplace_back(dataBuffer); + + auto arrowType = std::make_shared(); + auto arrayData = arrow::ArrayData::Make(arrowType, vectorSize, buffers); + std::vector> arrayVector; + auto arrowArray = std::make_shared>(arrayData); + arrayVector.emplace_back(arrowArray); + return ChunkedArray::Make(arrayVector, arrowType).ValueOrDie(); + } + + std::shared_ptr buildVarcharChunk(DataTypeId typeId, BaseVector *baseVector, + bool isSplitWrite = false, long startPos = 0, + long endPos = 0) + { + auto vector = static_cast> *>(baseVector); + + if (!isSplitWrite) { + startPos = 0; + endPos = vector->GetSize(); + } + + int64_t vectorSize = endPos - startPos; + auto bitmapBuffer = AllocateBitmap(vectorSize).ValueOrDie(); + arrow::internal::Bitmap bitmap(bitmapBuffer, 0, vectorSize); + bitmap.SetBitsTo(true); + + TypedBufferBuilder offsetsBuilder; + TypedBufferBuilder valuesBuilder; + int32_t currentOffset = 0; + offsetsBuilder.Append(0); + valuesBuilder.Resize(vectorSize); + + int64_t index = 0; + for (long j = startPos; j < endPos; j++) { + if (vector->IsNull(j)) { + bitmap.SetBitTo(index++, false); + } + index++; + std::string strValue = std::string(vector->GetValue(j)); + size_t length = strValue.length(); + currentOffset += length; + offsetsBuilder.Append(currentOffset); + valuesBuilder.Append(strValue.data(), length); + } + + auto offsetsBuffer = offsetsBuilder.Finish().ValueOrDie(); + auto valuesBuffer = valuesBuilder.Finish().ValueOrDie(); + + std::vector> buffers; + + buffers.emplace_back(bitmapBuffer); + buffers.emplace_back(offsetsBuffer); + buffers.emplace_back(valuesBuffer); + + auto utf8Type = std::make_shared(); + auto arrayData = arrow::ArrayData::Make(utf8Type, vectorSize, buffers); + + std::vector> arrayVector; + auto stringArray = std::make_shared(arrayData); + arrayVector.emplace_back(stringArray); + + return ChunkedArray::Make(arrayVector, utf8Type).ValueOrDie(); + } + + std::shared_ptr buildDecimal64Chunk(DataTypeId typeId, BaseVector *baseVector, + int precision, int scale, bool isSplitWrite = false, + long startPos = 0, long endPos = 0) + { + using T = typename NativeType::type; + auto vector = (Vector *)baseVector; + + if (!isSplitWrite) { + startPos = 0; + endPos = vector->GetSize(); + } + int64_t vectorSize = endPos - startPos; + auto bitmapBuffer = AllocateBitmap(vectorSize).ValueOrDie(); + arrow::internal::Bitmap bitmap(bitmapBuffer, 0, vectorSize); + bitmap.SetBitsTo(true); + BufferBuilder builder; + builder.Resize(vectorSize); + std::vector decimalArray; + + int64_t index = 0; + for (long j = startPos; j < endPos; j++) { + BasicDecimal128 basicDecimal128(0, vector->GetValue(j)); + decimalArray.emplace_back(BasicDecimal128(basicDecimal128)); + if (vector->IsNull(j)) { + bitmap.SetBitTo(index, false); + } + index++; + } + + builder.Append(decimalArray.data(), decimalArray.size() * sizeof(arrow::Decimal128)); + auto dataBuffer = *builder.Finish(); + std::vector> buffers; + buffers.emplace_back(bitmapBuffer); + buffers.emplace_back(dataBuffer); + + auto decimal128Type = std::make_shared(precision, scale); + auto arrayData = arrow::ArrayData::Make(decimal128Type, vectorSize, buffers); + std::vector> arrayVector; + auto decimal128Array = std::make_shared(arrayData); + arrayVector.emplace_back(decimal128Array); + return ChunkedArray::Make(arrayVector, decimal128Type).ValueOrDie(); + } + + std::shared_ptr buildDecimal128Chunk(DataTypeId typeId, BaseVector *baseVector, + int precision, int scale, bool isSplitWrite = false, + long startPos = 0, long endPos = 0) + { + using T = typename NativeType::type; + auto vector = (Vector *)baseVector; + + if (!isSplitWrite) { + startPos = 0; + endPos = vector->GetSize(); + } + int64_t vectorSize = endPos - startPos; + auto bitmapBuffer = AllocateBitmap(vectorSize).ValueOrDie(); + arrow::internal::Bitmap bitmap(bitmapBuffer, 0, vectorSize); + bitmap.SetBitsTo(true); + BufferBuilder builder; + builder.Resize(vectorSize); + std::vector decimalArray; + + int64_t index = 0; + for (long j = startPos; j < endPos; j++) { + auto decimalValue = vector->GetValue(j); + BasicDecimal128 basicDecimal128(vector->GetValue(j).HighBits(), vector->GetValue(j).LowBits()); + decimalArray.emplace_back(BasicDecimal128(basicDecimal128)); + if (vector->IsNull(j)) { + bitmap.SetBitTo(index, false); + } + index++; + } + + builder.Append(decimalArray.data(), decimalArray.size() * sizeof(arrow::Decimal128)); + auto dataBuffer = *builder.Finish(); + std::vector> buffers; + buffers.emplace_back(bitmapBuffer); + buffers.emplace_back(dataBuffer); + + auto decimal128Type = std::make_shared(precision, scale); + auto arrayData = arrow::ArrayData::Make(decimal128Type, vectorSize, buffers); + std::vector> arrayVector; + auto decimal128Array = std::make_shared(arrayData); + arrayVector.emplace_back(decimal128Array); + return ChunkedArray::Make(arrayVector, decimal128Type).ValueOrDie(); + } + + void ParquetWriter::write(long *vecNativeId, int colNums, + const int *omniTypes, + const unsigned char *dataColumnsIds, + bool isSplitWrite, long startPos , long endPos) + { + std::vector> chunks; + int decimalIndex = 0; + int precision = 0; + int scale = 0; + for (int i = 0; i < colNums; ++i) { + if (!dataColumnsIds[i]) { + continue; + } + + auto vec = (BaseVector *)vecNativeId[i]; + auto typeId = static_cast(omniTypes[i]); + switch (typeId) { + case OMNI_BOOLEAN: + chunks.emplace_back(buildBooleanChunk(typeId, vec, isSplitWrite, startPos, endPos)); + break; + case OMNI_SHORT: + chunks.emplace_back(buildChunk(vec, isSplitWrite, startPos, endPos)); + break; + case OMNI_INT: + chunks.emplace_back(buildChunk(vec, isSplitWrite, startPos, endPos)); + break; + case OMNI_LONG: + chunks.emplace_back(buildChunk(vec, isSplitWrite, startPos, endPos)); + break; + case OMNI_DATE32: + chunks.emplace_back(buildChunk(vec, isSplitWrite, startPos, endPos)); + break; + case OMNI_DATE64: + chunks.emplace_back(buildChunk(vec, isSplitWrite, startPos, endPos)); + break; + case OMNI_DOUBLE: + chunks.emplace_back(buildChunk(vec, isSplitWrite, startPos, endPos)); + break; + case OMNI_VARCHAR: + chunks.emplace_back(buildVarcharChunk(typeId, vec, isSplitWrite, startPos, endPos)); + break; + case OMNI_DECIMAL64: + precision = precisions[decimalIndex]; + scale = scales[decimalIndex]; + chunks.emplace_back(buildDecimal64Chunk(typeId, vec, precision, scale, isSplitWrite, startPos, endPos)); + decimalIndex++; + break; + case OMNI_DECIMAL128: + precision = precisions[decimalIndex]; + scale = scales[decimalIndex]; + chunks.emplace_back(buildDecimal128Chunk(typeId, vec, precision, scale, isSplitWrite, startPos, endPos)); + decimalIndex++; + break; + default: + throw std::runtime_error( + "Native columnar write not support for this type: " + std::to_string(typeId)); + } + } + auto numRows = chunks.empty() ? 0 : chunks[0]->length(); + + auto table = arrow::Table::Make(schema_, std::move(chunks), numRows); + if (!arrow_writer) { + throw std::runtime_error("Arrow writer is not initialized"); + } + PARQUET_THROW_NOT_OK(arrow_writer->WriteTable(*table)); + PARQUET_THROW_NOT_OK(arrow_writer->Close()); + } + +} // namespace omniruntime::writer diff --git a/omnioperator/omniop-native-reader/cpp/src/parquet/ParquetWriter.h b/omnioperator/omniop-native-reader/cpp/src/parquet/ParquetWriter.h new file mode 100644 index 0000000000000000000000000000000000000000..59043cfbcac28266545cb7eb6d44ba4aea37bec9 --- /dev/null +++ b/omnioperator/omniop-native-reader/cpp/src/parquet/ParquetWriter.h @@ -0,0 +1,52 @@ +/** + * Copyright (C) 2024-2024. Huawei Technologies Co., Ltd. All rights reserved. + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef NATIVE_READER_PARQUETWRITER_H +#define NATIVE_READER_PARQUETWRITER_H + + + +#include +#include +#include "common/UriInfo.h" +#include "parquet/arrow/writer.h" + +using namespace arrow::internal; + +namespace omniruntime::writer +{ + class ParquetWriter + { + public: + ParquetWriter() {} + + arrow::Status InitRecordWriter(UriInfo &uri, std::string &ugi); + std::shared_ptr BuildField(const std::string &name, int typeId, bool nullable); + void write(long *vecNativeId, int colNums, const int *omniTypes, const unsigned char *dataColumnsIds, + bool isSplitWrite = false, long starPos = 0, long endPos = 0); + void write(); + + public: + std::unique_ptr arrow_writer; + std::shared_ptr schema_; + std::vector precisions; + std::vector scales; + }; +} +#endif // NATIVE_READER_PARQUETWRITER_H \ No newline at end of file diff --git a/omnioperator/omniop-native-reader/java/pom.xml b/omnioperator/omniop-native-reader/java/pom.xml index 8f6369aaf79814aa852690fa50171c8c13009d95..1d5e336ad394c5e0d629d4bab5f5a430a42aec6e 100644 --- a/omnioperator/omniop-native-reader/java/pom.xml +++ b/omnioperator/omniop-native-reader/java/pom.xml @@ -88,6 +88,7 @@ bash ${cpp.dir}/build.sh + debug> ${plugin.cpp.test} diff --git a/omnioperator/omniop-native-reader/java/src/main/java/com/huawei/boostkit/write/jni/ParquetColumnarBatchJniWriter.java b/omnioperator/omniop-native-reader/java/src/main/java/com/huawei/boostkit/write/jni/ParquetColumnarBatchJniWriter.java new file mode 100644 index 0000000000000000000000000000000000000000..aa94fc62adeaaf45818f05f1e269fade55f1352f --- /dev/null +++ b/omnioperator/omniop-native-reader/java/src/main/java/com/huawei/boostkit/write/jni/ParquetColumnarBatchJniWriter.java @@ -0,0 +1,39 @@ +/* + * Copyright (C) 2024-2024. Huawei Technologies Co., Ltd. All rights reserved. + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.huawei.boostkit.write.jni; + +import com.huawei.boostkit.scan.jni.NativeReaderLoader; + +import org.json.JSONObject; + +public class ParquetColumnarBatchJniWriter { + public ParquetColumnarBatchJniWriter() { + NativeReaderLoader.getInstance(); + } + + public native void initializeWriter(JSONObject var1, long writer); + + public native long initializeSchema(long writer, String[] fieldNames, int[] fieldTypes, boolean[] nullables, int[][] decimalParam); + + public native void write(long writer, long[] vecNativeId, int[] omniTypes, boolean[] dataColumnsIds, int rowNums); + + public native void splitWrite(long writer, long[] vecNativeId, int[] omniTypes, boolean[] dataColumnsIds, long starPos, long endPos); + + public native void close(long writer); +} \ No newline at end of file diff --git a/omnioperator/omniop-spark-extension/java/src/main/java/com/huawei/boostkit/spark/jni/ParquetColumnarBatchWriter.java b/omnioperator/omniop-spark-extension/java/src/main/java/com/huawei/boostkit/spark/jni/ParquetColumnarBatchWriter.java new file mode 100644 index 0000000000000000000000000000000000000000..5e85c7e70954c325970f683ddcc51e3b764791a6 --- /dev/null +++ b/omnioperator/omniop-spark-extension/java/src/main/java/com/huawei/boostkit/spark/jni/ParquetColumnarBatchWriter.java @@ -0,0 +1,339 @@ +/* + * Copyright (C) 2024-2024. Huawei Technologies Co., Ltd. All rights reserved. + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.huawei.boostkit.spark.jni; + +import com.huawei.boostkit.scan.jni.ParquetColumnarBatchJniReader; +import com.huawei.boostkit.write.jni.OrcColumnarBatchJniWriter; +import com.huawei.boostkit.write.jni.ParquetColumnarBatchJniWriter; + +import nova.hetu.omniruntime.vector.IntVec; +import nova.hetu.omniruntime.vector.*; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.orc.OrcFile; +import org.apache.spark.sql.catalyst.util.RebaseDateTime; +import org.apache.spark.sql.execution.vectorized.OmniColumnVector; +import org.apache.spark.sql.types.BooleanType; +import org.apache.spark.sql.types.CharType; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.DateType; +import org.apache.spark.sql.types.DecimalType; +import org.apache.spark.sql.types.DoubleType; +import org.apache.spark.sql.types.IntegerType; +import org.apache.spark.sql.types.LongType; +import org.apache.spark.sql.types.ShortType; +import org.apache.spark.sql.types.StringType; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.types.VarcharType; +import org.apache.spark.sql.vectorized.ColumnarBatch; +import org.json.JSONObject; + +import java.io.IOException; +import java.net.URI; + +public class ParquetColumnarBatchWriter { + public long writer; + + public long schema; + + public ParquetColumnarBatchJniWriter jniWriter; + + public ParquetColumnarBatchWriter() { + jniWriter = new ParquetColumnarBatchJniWriter(); + } + + public enum ParquetLibTypeKind { + NA, + /// Boolean as 1 bit, LSB bit-packed ordering + BOOL, + + /// Unsigned 8-bit little-endian integer + UINT8, + + /// Signed 8-bit little-endian integer + INT8, + + /// Unsigned 16-bit little-endian integer + UINT16, + + /// Signed 16-bit little-endian integer + INT16, + + /// Unsigned 32-bit little-endian integer + UINT32, + + /// Signed 32-bit little-endian integer + INT32, + + /// Unsigned 64-bit little-endian integer + UINT64, + + /// Signed 64-bit little-endian integer + INT64, + + /// 2-byte floating point value + HALF_FLOAT, + + /// 4-byte floating point value + FLOAT, + + /// 8-byte floating point value + DOUBLE, + + /// UTF8 variable-length string as List + STRING, + + /// Variable-length bytes (no guarantee of UTF8-ness) + BINARY, + + /// Fixed-size binary. Each value occupies the same number of bytes + FIXED_SIZE_BINARY, + + /// int32_t days since the UNIX epoch + DATE32, + + /// int64_t milliseconds since the UNIX epoch + DATE64, + + /// Exact timestamp encoded with int64 since UNIX epoch + /// Default unit millisecond + TIMESTAMP, + + /// Time as signed 32-bit integer, representing either seconds or + /// milliseconds since midnight + TIME32, + + /// Time as signed 64-bit integer, representing either microseconds or + /// nanoseconds since midnight + TIME64, + + /// YEAR_MONTH interval in SQL style + INTERVAL_MONTHS, + + /// DAY_TIME interval in SQL style + INTERVAL_DAY_TIME, + + /// Precision- and scale-based decimal type with 128 bits. + DECIMAL128, + + /// Defined for backward-compatibility. + // DECIMAL = DECIMAL128, + + /// Precision- and scale-based decimal type with 256 bits. + DECIMAL256, + + /// A list of some logical data type + LIST, + + /// Struct of logical types + STRUCT, + + /// Sparse unions of logical types + SPARSE_UNION, + + /// Dense unions of logical types + DENSE_UNION, + + /// Dictionary-encoded type, also called "categorical" or "factor" + /// in other programming languages. Holds the dictionary value + /// type but not the dictionary itself, which is part of the + /// ArrayData struct + DICTIONARY, + + /// Map, a repeated struct logical type + MAP, + + /// Custom data type, implemented by user + EXTENSION, + + /// Fixed size list of some logical type + FIXED_SIZE_LIST, + + /// Measure of elapsed time in either seconds, milliseconds, microseconds + /// or nanoseconds. + DURATION, + + /// Like STRING, but with 64-bit offsets + LARGE_STRING, + + /// Like BINARY, but with 64-bit offsets + LARGE_BINARY, + + /// Like LIST, but with 64-bit offsets + LARGE_LIST, + + /// Calendar interval type with three fields. + INTERVAL_MONTH_DAY_NANO, + + // Leave this at the end + MAX_ID + } + + public void initializeWriterJava(Path path) throws IOException { + JSONObject writerOptionsJson = new JSONObject(); + String ugi = UserGroupInformation.getCurrentUser().toString(); + + URI uri = path.toUri(); + + writerOptionsJson.put("uri", path.toString()); + writerOptionsJson.put("ugi", ugi); + + writerOptionsJson.put("host", uri.getHost() == null ? "" : uri.getHost()); + writerOptionsJson.put("scheme", uri.getScheme() == null ? "" : uri.getScheme()); + writerOptionsJson.put("port", uri.getPort()); + writerOptionsJson.put("path", uri.getPath() == null ? "" : uri.getPath()); + + // writer = jniWriter.initializeWriter(writerOptionsJson); + jniWriter.initializeWriter(writerOptionsJson, writer); + } + + public void convertGreGorianToJulian(IntVec intVec, int startPos, int endPos) { + int julianValue; + for (int rowIndex = startPos; rowIndex < endPos; rowIndex++) { + julianValue = RebaseDateTime.rebaseGregorianToJulianDays(intVec.get(rowIndex)); + intVec.set(rowIndex, julianValue); + } + } + + public void initializeSchemaJava(StructType dataSchema) { + int schemaLength = dataSchema.length(); + String[] fieldNames = new String[schemaLength]; + int[] fieldTypes = new int[schemaLength]; + boolean[] nullables = new boolean[schemaLength]; + for (int i = 0; i < schemaLength; i++) { + StructField field = dataSchema.fields()[i]; + fieldNames[i] = field.name(); + fieldTypes[i] = sparkTypeToParquetLibType(field.dataType()); + nullables[i] = field.nullable(); + } + writer = jniWriter.initializeSchema(writer, fieldNames, fieldTypes, nullables, extractDecimalParam(dataSchema)); + } + + public int sparkTypeToParquetLibType(DataType dataType) { + if (dataType instanceof BooleanType) { + return ParquetLibTypeKind.BOOL.ordinal(); + } else if (dataType instanceof ShortType) { + return ParquetLibTypeKind.INT16.ordinal(); + } else if (dataType instanceof IntegerType) { +// IntegerType integerType = (IntegerType) dataType; +// switch (integerType.defaultSize()) { +// case 1: +// return ParquetLibTypeKind.INT8.ordinal(); +// case 2: +// return ParquetLibTypeKind.INT16.ordinal(); +// case 4: + return ParquetLibTypeKind.INT32.ordinal(); +// case 8: +// return ParquetLibTypeKind.DATE64.ordinal(); +// default: +// throw new RuntimeException( +// "UnSupport size " + integerType.defaultSize() + " of integer type"); +// } + } else if (dataType instanceof LongType) { + return ParquetLibTypeKind.INT64.ordinal(); + } else if (dataType instanceof DateType) { + DateType dateType = (DateType) dataType; + switch (dateType.defaultSize()) { + case 4: + return ParquetLibTypeKind.DATE32.ordinal(); + case 8: + return ParquetLibTypeKind.DATE64.ordinal(); + default: + throw new RuntimeException( + "UnSupport size " + dateType.defaultSize() + " of date type"); + } + } else if (dataType instanceof DoubleType) { + return ParquetLibTypeKind.DOUBLE.ordinal(); + } else if (dataType instanceof VarcharType) { + return ParquetLibTypeKind.STRING.ordinal(); + } else if (dataType instanceof StringType) { + return ParquetLibTypeKind.STRING.ordinal(); + } else if (dataType instanceof CharType) { + return ParquetLibTypeKind.STRING.ordinal(); + } else if (dataType instanceof DecimalType) { + DecimalType decimalType = (DecimalType) dataType; + switch (decimalType.defaultSize()) { + case 8: + return ParquetLibTypeKind.DECIMAL128.ordinal(); + case 16: + return ParquetLibTypeKind.DECIMAL128.ordinal(); + default: + throw new RuntimeException( + "UnSupport size " + decimalType.defaultSize() + " of decimal type"); + } + } else { + throw new RuntimeException( + "UnSupport type convert spark type " + dataType.simpleString() + " to parquet lib type"); + } + } + + public int[][] extractDecimalParam(StructType dataSchema) { + int paramNum = 2; + int precisionIndex = 0; + int scaleIndex = 1; + int[][] decimalParams = new int[dataSchema.length()][paramNum]; + for (int i = 0; i < dataSchema.length(); i++) { + DataType dataType = dataSchema.fields()[i].dataType(); + if (dataType instanceof DecimalType) { + DecimalType decimal = (DecimalType) dataType; + decimalParams[i][precisionIndex] = decimal.precision(); + decimalParams[i][scaleIndex] = decimal.scale(); + } + } + return decimalParams; + } + + public void write(int[] omniTypes, boolean[] dataColumnsIds, ColumnarBatch batch) { + + long[] vecNativeIds = new long[batch.numCols()]; + for (int i = 0; i < batch.numCols(); i++) { + OmniColumnVector omniVec = (OmniColumnVector) batch.column(i); + Vec vec = omniVec.getVec(); + vecNativeIds[i] = vec.getNativeVector(); + boolean isDateType = (omniTypes[i] == 8); + if (isDateType) { + convertGreGorianToJulian((IntVec) vec, 0, batch.numRows()); + } + } + + jniWriter.write(writer, vecNativeIds, omniTypes, dataColumnsIds, batch.numRows()); + } + + public void splitWrite(int[] omniTypes, int[] allOmniTypes, boolean[] dataColumnsIds, ColumnarBatch inputBatch, long startPos, long endPos) { + long[] vecNativeIds = new long[inputBatch.numCols()]; + for (int i = 0; i < inputBatch.numCols(); i++) { + OmniColumnVector omniVec = (OmniColumnVector) inputBatch.column(i); + Vec vec = omniVec.getVec(); + vecNativeIds[i] = vec.getNativeVector(); + boolean isDateType = (allOmniTypes[i] == 8); + if (isDateType) { + convertGreGorianToJulian((IntVec) vec, (int) startPos, (int) endPos); + } + } + + jniWriter.splitWrite(writer, vecNativeIds, omniTypes, dataColumnsIds, startPos, endPos); + } + + public void close() { + jniWriter.close(writer); + } + +} diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPlugin.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPlugin.scala index d19d1a46723c305ec28b897ef122d326ed693568..535c446557aa06dea68e3ad873e298507524fd84 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPlugin.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPlugin.scala @@ -41,6 +41,7 @@ import org.apache.spark.sql.catalyst.plans.LeftSemi import org.apache.spark.sql.catalyst.plans.logical.Aggregate import org.apache.spark.sql.execution.command.{DataWritingCommand, DataWritingCommandExec} import org.apache.spark.sql.execution.datasources.orc.{OmniOrcFileFormat, OrcFileFormat} +import org.apache.spark.sql.execution.datasources.parquet.{OmniParquetFileFormat, ParquetFileFormat} import org.apache.spark.sql.execution.datasources.{FileFormat, InsertIntoHadoopFsRelationCommand, OmniInsertIntoHadoopFsRelationCommand} import org.apache.spark.sql.execution.util.SparkMemoryUtils.addLeakSafeTaskCompletionListener import org.apache.spark.sql.execution.aggregate.PushOrderedLimitThroughAgg @@ -567,6 +568,7 @@ case class ColumnarPreOverrides(isSupportAdaptive: Boolean = true) logInfo(s"Columnar Processing for ${cmd.getClass} is currently supported.") val fileFormat: FileFormat = cmd.fileFormat match { case _: OrcFileFormat => new OmniOrcFileFormat() + case _: ParquetFileFormat => new OmniParquetFileFormat() case format => logInfo(s"Unsupported ${format.getClass} file " + s"format for columnar data write command.") diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/datasources/OmniFileFormatDataWriter.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/datasources/OmniFileFormatDataWriter.scala index f3fe865e0b85e88277ba10776bde67092415a245..ccb312da1a9380718f0dac74f7b8fc35d0e9e60c 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/datasources/OmniFileFormatDataWriter.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/datasources/OmniFileFormatDataWriter.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils import org.apache.spark.sql.catalyst.expressions.{Cast, Concat, Expression, Literal, ScalaUDF, UnsafeProjection, UnsafeRow} import org.apache.spark.sql.connector.write.DataWriter import org.apache.spark.sql.execution.datasources.orc.OmniOrcOutputWriter +import org.apache.spark.sql.execution.datasources.parquet.OmniParquetOutputWriter import org.apache.spark.sql.execution.metric.{CustomMetrics, SQLMetric} import org.apache.spark.sql.types.StringType import org.apache.spark.sql.vectorized.ColumnarBatch @@ -62,6 +63,9 @@ class OmniSingleDirectoryDataWriter( context = taskAttemptContext) currentWriter match { + case _: OmniParquetOutputWriter => + currentWriter.asInstanceOf[OmniParquetOutputWriter] + .initialize(description.allColumns, description.dataColumns) case _: OmniOrcOutputWriter => currentWriter.asInstanceOf[OmniOrcOutputWriter] .initialize(description.allColumns, description.dataColumns) @@ -153,9 +157,6 @@ abstract class OmniBaseDynamicPartitionDataWriter( row => proj(row).getInt(0) } - /** Returns the data columns to be written given an input row */ - protected val getOutputRow = - UnsafeProjection.create(description.dataColumns, description.allColumns) protected def getPartitionPath(partitionValues: Option[InternalRow], bucketId: Option[Int]): String = { @@ -186,6 +187,10 @@ abstract class OmniBaseDynamicPartitionDataWriter( currentPath } + /** Returns the data columns to be written given an input row */ + protected val getOutputRow = + UnsafeProjection.create(description.dataColumns, description.allColumns) + /** * Opens a new OutputWriter given a partition key and/or a bucket id. * If bucket id is specified, we will append it to the end of the file name, but before the @@ -235,8 +240,18 @@ abstract class OmniBaseDynamicPartitionDataWriter( path = currentPath, dataSchema = description.dataColumns.toStructType, context = taskAttemptContext) - currentWriter.asInstanceOf[OmniOrcOutputWriter] - .initialize(description.allColumns, description.dataColumns) + + currentWriter match { + case _: OmniParquetOutputWriter => + currentWriter.asInstanceOf[OmniParquetOutputWriter] + .initialize(description.allColumns, description.dataColumns) + case _: OmniOrcOutputWriter => + currentWriter.asInstanceOf[OmniOrcOutputWriter] + .initialize(description.allColumns, description.dataColumns) + case _ => + throw new UnsupportedOperationException + (s"Unsupported ${currentWriter.getClass} Output writer!") + } statsTrackers.foreach(_.newFile(currentPath)) } @@ -266,8 +281,17 @@ abstract class OmniBaseDynamicPartitionDataWriter( protected def writeRecord(record: InternalRow, startPos: Long, endPos: Long): Unit = { // TODO After add OmniParquetOutPutWriter need extract // a abstract interface named OmniOutPutWriter - assert(currentWriter.isInstanceOf[OmniOrcOutputWriter]) - currentWriter.asInstanceOf[OmniOrcOutputWriter].spiltWrite(record, startPos, endPos) + currentWriter match { + case _: OmniParquetOutputWriter => + assert(currentWriter.isInstanceOf[OmniParquetOutputWriter]) + currentWriter.asInstanceOf[OmniParquetOutputWriter].spiltWrite(record, startPos, endPos) + case _: OmniOrcOutputWriter => + assert(currentWriter.isInstanceOf[OmniOrcOutputWriter]) + currentWriter.asInstanceOf[OmniOrcOutputWriter].spiltWrite(record, startPos, endPos) + case _ => + throw new UnsupportedOperationException + (s"writeRecord Unsupported ${currentWriter.getClass} Output writer!") + } statsTrackers.foreach(_.newRow(currentWriter.path, record)) recordsInFile += record.asInstanceOf[OmniInternalRow].batch.numRows() diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ OmniParquetOutputWriter.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ OmniParquetOutputWriter.scala new file mode 100644 index 0000000000000000000000000000000000000000..5c58664ea32e3b93e2b644926275915f50d8b738 --- /dev/null +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ OmniParquetOutputWriter.scala @@ -0,0 +1,86 @@ +/* + * Copyright (C) 2024-2024. Huawei Technologies Co., Ltd. All rights reserved. + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import com.huawei.boostkit.spark.expression.OmniExpressionAdaptor.sparkTypeToOmniType +import com.huawei.boostkit.spark.jni.{OrcColumnarBatchWriter, ParquetColumnarBatchWriter} +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapreduce._ +import org.apache.hadoop.security.UserGroupInformation +import org.apache.parquet.hadoop.ParquetOutputFormat +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.execution.datasources.{OmniInternalRow, OutputWriter} +import org.apache.spark.sql.types.StructType + +import scala.Array.{emptyBooleanArray, emptyIntArray} + +// NOTE: This class is instantiated and used on executor side only, no need to be serializable. +class OmniParquetOutputWriter(path: String, dataSchema: StructType, + context: TaskAttemptContext) + extends OutputWriter { + + val writer = new ParquetColumnarBatchWriter() + // var omniTypes: Array[Int] = new Array[Int](0) + // var dataColumnsIds: Array[Boolean] = new Array[Boolean](0) + // var allOmniTypes: Array[Int] = new Array[Int](0) + var omniTypes: Array[Int] = emptyIntArray + var dataColumnsIds: Array[Boolean] = emptyBooleanArray + var allOmniTypes: Array[Int] = emptyIntArray + + def initialize(allColumns: Seq[Attribute], dataColumns: Seq[Attribute]): Unit = { + val filePath = new Path(path) + writer.initializeSchemaJava(dataSchema) + writer.initializeWriterJava(filePath) + // dataSchema.foreach(field => { + // omniTypes = omniTypes :+ sparkTypeToOmniType(field.dataType, field.metadata).getId.ordinal() + // }) + omniTypes = dataSchema.fields + .map(field => sparkTypeToOmniType(field.dataType, field.metadata).getId.ordinal()) + .toArray + + // allColumns.toStructType.foreach(field => { + // allOmniTypes = allOmniTypes :+ sparkTypeToOmniType(field.dataType, field.metadata) + // .getId.ordinal() + // }) + allOmniTypes = allColumns.toStructType.fields + .map(field => sparkTypeToOmniType(field.dataType, field.metadata).getId.ordinal()) + .toArray + dataColumnsIds = allColumns.map(x => dataColumns.contains(x)).toArray + } + + override def write(row: InternalRow): Unit = { + assert(row.isInstanceOf[OmniInternalRow]) + writer.write(omniTypes, dataColumnsIds, row.asInstanceOf[OmniInternalRow].batch) + } + + def spiltWrite(row: InternalRow, startPos: Long, endPos: Long): Unit = { + assert(row.isInstanceOf[OmniInternalRow]) + writer.splitWrite(omniTypes, allOmniTypes, dataColumnsIds, + row.asInstanceOf[OmniInternalRow].batch, startPos, endPos) + } + + override def close(): Unit = { + writer.close() + } + + override def path(): String = { + path + } +} diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/OmniParquetFileFormat.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/OmniParquetFileFormat.scala index 78acf30584d9a3c784b9bf7ec5ca52bc4abf252e..3bddde7fcfb8f90477f361a05900ef781825b254 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/OmniParquetFileFormat.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/OmniParquetFileFormat.scala @@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl +import org.apache.parquet.hadoop.util.ContextUtil import org.apache.spark.TaskContext import org.apache.spark.internal.Logging import org.apache.spark.sql._ @@ -33,7 +34,12 @@ import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ import org.apache.spark.util.SerializableConfiguration import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS + import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap +import org.apache.parquet.hadoop.{ParquetOutputCommitter, ParquetOutputFormat} +import org.apache.parquet.hadoop.codec.CodecConfig +import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel +import org.apache.spark.sql.internal.SQLConf import java.net.URI @@ -48,28 +54,109 @@ class OmniParquetFileFormat extends FileFormat with DataSourceRegister with Logg override def equals(other: Any): Boolean = other.isInstanceOf[OmniParquetFileFormat] override def prepareWrite( - sparkSession: SparkSession, - job: Job, - options: Map[String, String], - dataSchema: StructType): OutputWriterFactory = { - throw new UnsupportedOperationException() + sparkSession: SparkSession, + job: Job, + options: Map[String, String], + dataSchema: StructType): OutputWriterFactory = { + val parquetOptions = new ParquetOptions(options, sparkSession.sessionState.conf) + + val conf = ContextUtil.getConfiguration(job) + + val committerClass = + conf.getClass( + SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key, + classOf[ParquetOutputCommitter], + classOf[OutputCommitter]) + + if (conf.get(SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key) == null) { + logInfo("Using default output committer for Parquet: " + + classOf[ParquetOutputCommitter].getCanonicalName) + } else { + logInfo("Using user defined output committer for Parquet: " + committerClass.getCanonicalName) + } + + conf.setClass( + SQLConf.OUTPUT_COMMITTER_CLASS.key, + committerClass, + classOf[OutputCommitter]) + + // We're not really using `ParquetOutputFormat[Row]` for writing data here, because we override + // it in `ParquetOutputWriter` to support appending and dynamic partitioning. The reason why + // we set it here is to setup the output committer class to `ParquetOutputCommitter`, which is + // bundled with `ParquetOutputFormat[Row]`. + job.setOutputFormatClass(classOf[ParquetOutputFormat[Row]]) + + ParquetOutputFormat.setWriteSupportClass(job, classOf[ParquetWriteSupport]) + + // This metadata is useful for keeping UDTs like Vector/Matrix. + ParquetWriteSupport.setSchema(dataSchema, conf) + + // Sets flags for `ParquetWriteSupport`, which converts Catalyst schema to Parquet + // schema and writes actual rows to Parquet files. + conf.set( + SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key, + sparkSession.sessionState.conf.writeLegacyParquetFormat.toString) + + conf.set( + SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key, + sparkSession.sessionState.conf.parquetOutputTimestampType.toString) + + conf.set( + SQLConf.PARQUET_FIELD_ID_WRITE_ENABLED.key, + sparkSession.sessionState.conf.parquetFieldIdWriteEnabled.toString) + + // Sets compression scheme + conf.set(ParquetOutputFormat.COMPRESSION, parquetOptions.compressionCodecClassName) + + // SPARK-15719: Disables writing Parquet summary files by default. + if (conf.get(ParquetOutputFormat.JOB_SUMMARY_LEVEL) == null + && conf.get(ParquetOutputFormat.ENABLE_JOB_SUMMARY) == null) { + conf.setEnum(ParquetOutputFormat.JOB_SUMMARY_LEVEL, JobSummaryLevel.NONE) + } + + if (ParquetOutputFormat.getJobSummaryLevel(conf) != JobSummaryLevel.NONE + && !classOf[ParquetOutputCommitter].isAssignableFrom(committerClass)) { + // output summary is requested, but the class is not a Parquet Committer + logWarning(s"Committer $committerClass is not a ParquetOutputCommitter and cannot" + + s" create job summaries. " + + s"Set Parquet option ${ParquetOutputFormat.JOB_SUMMARY_LEVEL} to NONE.") + } + + new OutputWriterFactory { + // This OutputWriterFactory instance is deserialized when writing Parquet files on the + // executor side without constructing or deserializing ParquetFileFormat. Therefore, we hold + // another reference to ParquetLogRedirector.INSTANCE here to ensure the latter class is + // initialized. + private val parquetLogRedirector = ParquetLogRedirector.INSTANCE + + override def newInstance( + path: String, + dataSchema: StructType, + context: TaskAttemptContext): OutputWriter = { + new OmniParquetOutputWriter(path, dataSchema, context) + } + + override def getFileExtension(context: TaskAttemptContext): String = { + CodecConfig.from(context).getCodec.getExtension + ".parquet" + } + } } override def inferSchema( - sparkSession: SparkSession, - parameters: Map[String, String], - files: Seq[FileStatus]): Option[StructType] = { + sparkSession: SparkSession, + parameters: Map[String, String], + files: Seq[FileStatus]): Option[StructType] = { ParquetUtils.inferSchema(sparkSession, parameters, files) } override def buildReaderWithPartitionValues( - sparkSession: SparkSession, - dataSchema: StructType, - partitionSchema: StructType, - requiredSchema: StructType, - filters: Seq[Filter], - options: Map[String, String], - hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = { + sparkSession: SparkSession, + dataSchema: StructType, + partitionSchema: StructType, + requiredSchema: StructType, + filters: Seq[Filter], + options: Map[String, String], + hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = { val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) diff --git a/omnioperator/omniop-spark-extension/java/src/test/scala/com/huawei/boostkit/spark/TableWriteBasicFunctionSuite.scala b/omnioperator/omniop-spark-extension/java/src/test/scala/com/huawei/boostkit/spark/TableWriteBasicFunctionSuite.scala index b32c3983d8331a4fa94cc5351f21f7d78da6727b..c8888db59230c9a3f68b419344cf78971086cd27 100644 --- a/omnioperator/omniop-spark-extension/java/src/test/scala/com/huawei/boostkit/spark/TableWriteBasicFunctionSuite.scala +++ b/omnioperator/omniop-spark-extension/java/src/test/scala/com/huawei/boostkit/spark/TableWriteBasicFunctionSuite.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, ColumnarBroa import org.apache.spark.sql.execution.{ColumnarBroadcastExchangeExec, ColumnarFilterExec, ColumnarProjectExec, ColumnarTakeOrderedAndProjectExec, CommandResultExec, LeafExecNode, OmniColumnarToRowExec, ProjectExec, RowToOmniColumnarExec, SparkPlan, TakeOrderedAndProjectExec, UnaryExecNode} import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.execution.ColumnarDataWritingCommandExec import scala.concurrent.Future @@ -63,6 +64,24 @@ class TableWriteBasicFunctionSuite extends QueryTest with SharedSparkSession { val runRows = select.collect() val expectedRows = Seq(Row("Lisa", "Sales", 10000, 35), Row("Maggie", "Sales", 1, 2)) assert(QueryTest.sameRows(runRows, expectedRows).isEmpty, "the run value is error") + + } + + test("Insert basic data parquet (Non-partitioned table)") { + val dropParquet = spark.sql("drop table if exists employees_for_parquet_table_write_ut_test") + dropParquet.collect() + val employeesParquet = Seq[(String, String, Int, Int)]( + ("Lisa", "Sales", 10000, 35), + ).toDF("name", "dept", "salary", "age") + employeesParquet.write.format("parquet").saveAsTable("employees_for_parquet_table_write_ut_test") + + val insertParquet = spark.sql("insert into " + + "employees_for_parquet_table_write_ut_test values('Maggie', 'Sales', 1, 2)") + insertParquet.collect() + val selectParquet = spark.sql("select * from employees_for_parquet_table_write_ut_test") + val runRowsParquet = selectParquet.collect() + val expectedRowsParquet = Seq(Row("Lisa", "Sales", 10000, 35), Row("Maggie", "Sales", 1, 2)) + assert(QueryTest.sameRows(runRowsParquet, expectedRowsParquet).isEmpty, "the run value is error") } test("Insert Basic data (Partitioned table)") { @@ -81,6 +100,22 @@ class TableWriteBasicFunctionSuite extends QueryTest with SharedSparkSession { assert(QueryTest.sameRows(runRows, expectedRows).isEmpty, "the run value is error") } + test("Insert Basic data parquet (Partitioned table)") { + val drop = spark.sql("drop table if exists employees_for_parquet_table_write_ut_partition_test") + drop.collect() + val employees = Seq(("Lisa", "Sales", 10000, 35)).toDF("name", "dept", "salary", "age") + employees.write.format("parquet").partitionBy("age") + .saveAsTable("employees_for_parquet_table_write_ut_partition_test") + val insert = spark.sql("insert into employees_for_parquet_table_write_ut_partition_test " + + "values('Maggie','Sales',200,30),('Bob','Sales',2000,30),('Tom','Sales',5000,20)") + insert.collect() + val select = spark.sql("select * from employees_for_parquet_table_write_ut_partition_test") + val runRows = select.collect() + val expectedRows = Seq(Row("Lisa", "Sales", 10000, 35), Row("Maggie", "Sales", 200, 30), + Row("Bob", "Sales", 2000, 30), Row("Tom", "Sales", 5000, 20)) + assert(QueryTest.sameRows(runRows, expectedRows).isEmpty, "the run value is error") + } + test("Unsupported Scenarios") { val data = Seq[(Int, Int)]( (10000, 35), @@ -91,10 +126,9 @@ class TableWriteBasicFunctionSuite extends QueryTest with SharedSparkSession { insert.collect() var columnarDataWrite = insert.queryExecution.executedPlan.asInstanceOf[CommandResultExec] .commandPhysicalPlan.find({ - case _: DataWritingCommandExec => true + case _: ColumnarDataWritingCommandExec => true case _ => false - } - ) + }) assert(columnarDataWrite.isDefined, "use columnar data writing command") val createTable = spark.sql("create table table_write_ut_map_test" + @@ -128,6 +162,22 @@ class TableWriteBasicFunctionSuite extends QueryTest with SharedSparkSession { "529314109398732268.884038357697864858", "the run value is error") } + test("Insert of parquet decimal 128") { + val drop = spark.sql("drop table if exists table_parquet_for_decimal_128") + drop.collect() + val createTable = spark.sql("create table table_parquet_for_decimal_128 " + + "(amount DECIMAL(38,18)) using parquet") + createTable.collect() + + val insert = spark.sql("insert into table_parquet_for_decimal_128 " + + "values(529314109398732268.884038357697864858)") + insert.collect() + val select = spark.sql("select * from table_parquet_for_decimal_128") + val runRows = select.collect() + assert(runRows(0).getDecimal(0).toString == + "529314109398732268.884038357697864858", "the run value is error") + } + test("replace child plan to columnar") { val drop = spark.sql("drop table if exists test_parquet_int") drop.collect() @@ -148,8 +198,8 @@ class TableWriteBasicFunctionSuite extends QueryTest with SharedSparkSession { val columnarFilter = insertNew.queryExecution.executedPlan.asInstanceOf[CommandResultExec] .commandPhysicalPlan.find({ - case _: ColumnarFilterExec => true - case _ => false + case _: ColumnarFilterExec => false + case _ => true } ) assert(columnarFilter.isDefined, "use columnar data writing command") @@ -169,6 +219,20 @@ class TableWriteBasicFunctionSuite extends QueryTest with SharedSparkSession { "1001-01-04", "the run value is error") } + test("rebase parquet date to julian") { + val drop = spark.sql("drop table if exists test_parquet_date") + drop.collect() + val createTable = spark.sql("create table test_parquet_date(date_col date) using parquet") + createTable.collect() + val insert = spark.sql("insert into table test_parquet_date values(cast('1001-01-04' as date))") + insert.collect() + + val select = spark.sql("select * from test_parquet_date") + val runRows = select.collect() + assert(runRows(0).getDate(0).toString == + "1001-01-04", "the run value is error") + } + test("empty string partition") { val drop = spark.sql("drop table if exists table_insert_varchar") drop.collect() @@ -201,4 +265,37 @@ class TableWriteBasicFunctionSuite extends QueryTest with SharedSparkSession { Row(13, "6884578", 6, null, null)) assert(QueryTest.sameRows(runRowsNP, expectedRowsNP).isEmpty, "the run value is error") } + + test("empty parquet string partition") { + val drop = spark.sql("drop table if exists table_parquet_insert_varchar") + drop.collect() + val createTable = spark.sql("create table table_parquet_insert_varchar" + + "(id int, c_varchar varchar(40)) using parquet partitioned by (p_varchar varchar(40))") + createTable.collect() + val insert = spark.sql("insert into table table_parquet_insert_varchar values" + + "(5,'',''), (13,'6884578', null), (6,'72135', '666')") + insert.collect() + + val select = spark.sql("select * from table_parquet_insert_varchar order by id, c_varchar, p_varchar") + val runRows = select.collect() + val expectedRows = Seq(Row(5, "", null), Row(6, "72135", "666"), Row(13, "6884578", null)) + assert(QueryTest.sameRows(runRows, expectedRows).isEmpty, "the run value is error") + + val dropNP = spark.sql("drop table if exists table_parquet_insert_varchar_np") + dropNP.collect() + val createTableNP = spark.sql("create table table_parquet_insert_varchar_np" + + "(id int, c_varchar varchar(40)) using parquet partitioned by " + + "(p_varchar1 int, p_varchar2 varchar(40), p_varchar3 varchar(40))") + createTableNP.collect() + val insertNP = spark.sql("insert into table table_parquet_insert_varchar_np values" + + "(5,'',1,'',''), (13,'6884578',6, null, null), (1,'abc',1,'',''), " + + "(3,'abcde',6,null,null), (4,'qqqqq', 8, 'a', 'b'), (6,'ooooo', 8, 'a', 'b')") + val selectNP = spark.sql("select * from table_parquet_insert_varchar_np " + + "order by id, c_varchar, p_varchar1") + val runRowsNP = selectNP.collect() + val expectedRowsNP = Seq(Row(1, "abc", 1, null, null), Row(3, "abcde", 6, null, null), + Row(4, "qqqqq", 8, "a", "b"), Row(5, "", 1, null, null), Row(6, "ooooo", 8, "a", "b"), + Row(13, "6884578", 6, null, null)) + assert(QueryTest.sameRows(runRowsNP, expectedRowsNP).isEmpty, "the run value is error") + } }