Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions flink-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,13 @@ under the License.
<artifactId>flink-shaded-guava</artifactId>
</dependency>

<!-- Bitmap internal implementation -->
<dependency>
<groupId>org.roaringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
<version>1.3.0</version>
</dependency>

<!-- ================== test dependencies ================== -->

<dependency>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.common.typeinfo;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.SerializerConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.BitmapSerializer;
import org.apache.flink.types.bitmap.Bitmap;

/** Type information for {@link Bitmap}. */
@PublicEvolving
public class BitmapTypeInfo extends TypeInformation<Bitmap> {

private static final long serialVersionUID = 1L;

public static final BitmapTypeInfo INSTANCE = new BitmapTypeInfo();

private BitmapTypeInfo() {}

@Override
public boolean isBasicType() {
return false;
}

@Override
public boolean isTupleType() {
return false;
}

@Override
public int getArity() {
return 1;
}

@Override
public int getTotalFields() {
return 1;
}

@Override
public Class<Bitmap> getTypeClass() {
return Bitmap.class;
}

@Override
public boolean isKeyType() {
return true;
}

@Override
public boolean isSortKeyType() {
return false;
}

@Override
public TypeSerializer<Bitmap> createSerializer(SerializerConfig config) {
return BitmapSerializer.INSTANCE;
}

@Override
public String toString() {
return Bitmap.class.getSimpleName();
}

@Override
public boolean equals(Object obj) {
if (obj instanceof BitmapTypeInfo) {
BitmapTypeInfo other = (BitmapTypeInfo) obj;
return other.canEqual(this);
} else {
return false;
}
}

@Override
public int hashCode() {
return BitmapTypeInfo.class.hashCode();
}

@Override
public boolean canEqual(Object obj) {
return obj instanceof BitmapTypeInfo;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.flink.types.Either;
import org.apache.flink.types.Row;
import org.apache.flink.types.Value;
import org.apache.flink.types.bitmap.Bitmap;
import org.apache.flink.types.variant.Variant;

import java.lang.reflect.Field;
Expand Down Expand Up @@ -159,6 +160,12 @@ public class Types {

public static final TypeInformation<Variant> VARIANT = VariantTypeInfo.INSTANCE;

/**
* Returns type information for {@link org.apache.flink.types.bitmap.Bitmap}. Supports a null
* value.
*/
public static final TypeInformation<Bitmap> BITMAP = BitmapTypeInfo.INSTANCE;

// CHECKSTYLE.OFF: MethodName

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.common.typeutils.base;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.types.bitmap.Bitmap;

import java.io.IOException;

/** Serializer for {@link Bitmap}. */
@Internal
public class BitmapSerializer extends TypeSerializerSingleton<Bitmap> {

public static final BitmapSerializer INSTANCE = new BitmapSerializer();

@Override
public boolean isImmutableType() {
return false;
}

@Override
public Bitmap createInstance() {
return Bitmap.empty();
}

@Override
public Bitmap copy(Bitmap from) {
return Bitmap.from(from);
}

@Override
public Bitmap copy(Bitmap from, Bitmap reuse) {
return copy(from);
}

@Override
public int getLength() {
return -1;
}

@Override
public void serialize(Bitmap record, DataOutputView target) throws IOException {
byte[] bytes = record.toBytes();
target.writeInt(bytes.length);
target.write(bytes);
}

@Override
public Bitmap deserialize(DataInputView source) throws IOException {
int length = source.readInt();
byte[] bytes = new byte[length];
source.read(bytes);
return Bitmap.fromBytes(bytes);
}

@Override
public Bitmap deserialize(Bitmap reuse, DataInputView source) throws IOException {
return this.deserialize(source);
}

@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
int length = source.readInt();
target.writeInt(length);
target.write(source, length);
}

@Override
public TypeSerializerSnapshot<Bitmap> snapshotConfiguration() {
return new BitmapSerializerSnapshot();
}

@Internal
public static final class BitmapSerializerSnapshot
extends SimpleTypeSerializerSnapshot<Bitmap> {
/** Constructor to create snapshot from serializer (writing the snapshot). */
public BitmapSerializerSnapshot() {
super(() -> INSTANCE);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.BitmapTypeInfo;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInfo;
Expand All @@ -50,6 +51,7 @@
import org.apache.flink.api.java.typeutils.TypeExtractionUtils.LambdaExecutable;
import org.apache.flink.types.Row;
import org.apache.flink.types.Value;
import org.apache.flink.types.bitmap.Bitmap;
import org.apache.flink.types.variant.Variant;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
Expand Down Expand Up @@ -1977,6 +1979,11 @@ private <OUT, IN1, IN2> TypeInformation<OUT> privateGetForClass(
return (TypeInformation<OUT>) VariantTypeInfo.INSTANCE;
}

// check for Bitmap
if (Bitmap.class.isAssignableFrom(clazz)) {
return (TypeInformation<OUT>) BitmapTypeInfo.INSTANCE;
}

// check for parameterized Collections, requirement:
// 1. Interface types: the underlying implementation types are not preserved across
// serialization
Expand Down
Loading