diff --git a/README.md b/README.md
index 68ac992..2b12b87 100644
--- a/README.md
+++ b/README.md
@@ -45,8 +45,8 @@ It is intended that the application uses a certificate to expose its API and use
## Notice
* Compatible with **JDK 8, 11, 15, 16 and 17**
- * Compatible with **schema-registry version 5.3.1 or later**
- * Compatible with **avro version 1.9.1 or later**
+ * Compatible with **schema-registry version 7.1.3 or later**
+ * Compatible with **avro version 1.11.1 or later**
## 1. Quick Start
@@ -118,7 +118,7 @@ You can pull it from the central Maven repositories:
com.github.mvallim
spring-schema-registry
- 2.1.0
+ 2.2.0
```
@@ -138,7 +138,7 @@ If you want to try a snapshot version, add the following repository:
#### Gradle
```groovy
-implementation 'com.github.mvallim:spring-schema-registry:2.1.0'
+implementation 'com.github.mvallim:spring-schema-registry:2.2.0'
```
If you want to try a snapshot version, add the following repository:
diff --git a/pom.xml b/pom.xml
index 648ffcf..d096797 100644
--- a/pom.xml
+++ b/pom.xml
@@ -5,7 +5,7 @@
com.github.mvallim
spring-schema-registry
- 2.1.1-SNAPSHOT
+ 2.2.0-SNAPSHOT
jar
spring-schema-registry
@@ -14,11 +14,11 @@
1.8
UTF-8
UTF-8
- 5.3.1
+ 7.1.3
3.12.0
- 2.2.13.RELEASE
- 1.9.1
- 1.18.20
+ 2.7.2
+ 1.11.1
+ 1.18.24
1.4
@@ -67,6 +67,7 @@
org.springframework
spring-core
+ 5.3.22
@@ -79,6 +80,7 @@
org.projectlombok
lombok
+ ${lombok.version}
true
@@ -99,33 +101,32 @@
+
- junit
- junit
- test
-
-
-
- org.hamcrest
- hamcrest-library
+ org.junit.jupiter
+ junit-jupiter-engine
+ 5.9.0
test
org.apache.kafka
- kafka_2.11
+ kafka_2.13
+ 3.1.1
test
ch.qos.logback
logback-classic
+ 1.2.11
test
org.assertj
assertj-core
+ 3.23.1
test
@@ -204,7 +205,7 @@
org.apache.maven.plugins
maven-surefire-plugin
- 2.22.2
+ 3.0.0-M7
false
${jacoco.argLine} -Dfile.encoding=${project.build.sourceEncoding}
@@ -214,7 +215,7 @@
org.jacoco
jacoco-maven-plugin
- 0.8.6
+ 0.8.8
**/test/**
@@ -245,14 +246,14 @@
org.apache.maven.plugins
maven-clean-plugin
- 2.5
+ 3.2.0
true
org.apache.maven.plugins
maven-compiler-plugin
- 3.7.0
+ 3.10.1
${java.version}
${java.version}
@@ -274,7 +275,7 @@
org.apache.maven.plugins
maven-resources-plugin
- 3.0.2
+ 3.2.0
${project.build.sourceEncoding}
@@ -283,7 +284,7 @@
org.apache.maven.plugins
maven-source-plugin
- 2.2.1
+ 3.2.1
true
@@ -298,14 +299,14 @@
org.apache.maven.plugins
maven-install-plugin
- 2.5.1
+ 3.0.1
true
org.apache.maven.plugins
maven-deploy-plugin
- 2.8.1
+ 3.0.0
true
true
@@ -394,7 +395,7 @@
org.apache.maven.plugins
maven-source-plugin
- 2.2.1
+ 3.2.1
package
@@ -407,7 +408,7 @@
org.apache.maven.plugins
maven-javadoc-plugin
- 2.9.1
+ 3.3.2
package
diff --git a/src/main/java/org/springframework/schemaregistry/deserializer/GenericKafkaAvroDeserializer.java b/src/main/java/org/springframework/schemaregistry/deserializer/GenericKafkaAvroDeserializer.java
index 8a464c6..07fb719 100644
--- a/src/main/java/org/springframework/schemaregistry/deserializer/GenericKafkaAvroDeserializer.java
+++ b/src/main/java/org/springframework/schemaregistry/deserializer/GenericKafkaAvroDeserializer.java
@@ -4,6 +4,7 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
@@ -36,7 +37,7 @@ public GenericKafkaAvroDeserializer(final SchemaRegistryClient schemaRegistryCli
@Override
@SneakyThrows
- protected Object deserialize(final boolean includeSchemaAndVersion, final String topic, final Boolean isKey, final byte[] payload, final Schema readerSchema) throws SerializationException {
+ protected Object deserialize(final String topic, final Boolean isKey, final byte[] payload, final Schema readerSchema) throws SerializationException {
final ByteBuffer buffer = ByteBuffer.wrap(payload);
@@ -44,16 +45,16 @@ protected Object deserialize(final boolean includeSchemaAndVersion, final String
throw new SerializationException("Unknown magic byte!");
}
- final Schema schema = schemaRegistry.getById(buffer.getInt());
+ final AvroSchema schema = (AvroSchema)schemaRegistry.getSchemaById(buffer.getInt());
- final String fullName = schema.getFullName();
+ final String fullName = schema.name();
if (constraintClass(fullName)) {
- return super.deserialize(includeSchemaAndVersion, topic, isKey, payload, readerSchema);
+ return super.deserialize(topic, isKey, payload, readerSchema);
} else {
final int length = buffer.limit() - 1 - idSize;
final int start = buffer.position() + buffer.arrayOffset();
- final DatumReader datumReader = new GenericDatumReader<>(schema);
+ final DatumReader datumReader = new GenericDatumReader<>(schema.rawSchema());
return datumReader.read(null, decoderFactory.binaryDecoder(buffer.array(), start, length, null));
}
diff --git a/src/main/java/org/springframework/schemaregistry/deserializer/SpecificKafkaAvroDeserializer.java b/src/main/java/org/springframework/schemaregistry/deserializer/SpecificKafkaAvroDeserializer.java
index 95f0935..062bd4e 100644
--- a/src/main/java/org/springframework/schemaregistry/deserializer/SpecificKafkaAvroDeserializer.java
+++ b/src/main/java/org/springframework/schemaregistry/deserializer/SpecificKafkaAvroDeserializer.java
@@ -5,13 +5,13 @@
import javax.net.ssl.SSLSocketFactory;
+import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import org.apache.kafka.common.serialization.Deserializer;
import org.springframework.schemaregistry.core.SchemaRegistrySSLSocketFactory;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.RestService;
-import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
@@ -34,7 +34,7 @@ public void configure(final Map configs, final boolean isKey) {
if (Objects.isNull(schemaRegistry)) {
- final AbstractKafkaAvroSerDeConfig deserializerConfig = new KafkaAvroDeserializerConfig(configs);
+ final AbstractKafkaSchemaSerDeConfig deserializerConfig = new KafkaAvroDeserializerConfig(configs);
final RestService restService = new RestService(deserializerConfig.getSchemaRegistryUrls());
diff --git a/src/main/java/org/springframework/schemaregistry/serializer/SpecificKafkaAvroSerializer.java b/src/main/java/org/springframework/schemaregistry/serializer/SpecificKafkaAvroSerializer.java
index ff01614..cb97f0e 100644
--- a/src/main/java/org/springframework/schemaregistry/serializer/SpecificKafkaAvroSerializer.java
+++ b/src/main/java/org/springframework/schemaregistry/serializer/SpecificKafkaAvroSerializer.java
@@ -5,13 +5,13 @@
import javax.net.ssl.SSLSocketFactory;
+import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import org.apache.kafka.common.serialization.Serializer;
import org.springframework.schemaregistry.core.SchemaRegistrySSLSocketFactory;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.RestService;
-import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
@@ -34,7 +34,7 @@ public void configure(final Map configs, final boolean isKey) {
if (Objects.isNull(schemaRegistry)) {
- final AbstractKafkaAvroSerDeConfig serializerConfig = new KafkaAvroSerializerConfig(configs);
+ final AbstractKafkaSchemaSerDeConfig serializerConfig = new KafkaAvroSerializerConfig(configs);
final RestService restService = new RestService(serializerConfig.getSchemaRegistryUrls());
diff --git a/src/test/java/org/springframework/schemaregistry/core/SchemaRegistrySSLSocketFactoryTest.java b/src/test/java/org/springframework/schemaregistry/core/SchemaRegistrySSLSocketFactoryTest.java
index 90eaafc..45b70c2 100644
--- a/src/test/java/org/springframework/schemaregistry/core/SchemaRegistrySSLSocketFactoryTest.java
+++ b/src/test/java/org/springframework/schemaregistry/core/SchemaRegistrySSLSocketFactoryTest.java
@@ -1,22 +1,21 @@
package org.springframework.schemaregistry.core;
-import static org.hamcrest.CoreMatchers.notNullValue;
-import static org.hamcrest.CoreMatchers.nullValue;
-import static org.hamcrest.MatcherAssert.assertThat;
-import java.security.NoSuchAlgorithmException;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocketFactory;
+import java.security.NoSuchAlgorithmException;
+
+import static org.assertj.core.api.Assertions.assertThat;
-import org.junit.Before;
-import org.junit.Test;
public class SchemaRegistrySSLSocketFactoryTest {
private final SslSocketFactoryConfig properties = new SslSocketFactoryConfig();
- @Before
+ @BeforeEach
public void SetUp() throws NoSuchAlgorithmException {
properties.setProtocol("SSL");
properties.setKeyPassword("changeit");
@@ -36,7 +35,7 @@ public void whenMinimalValidConfigurationGetSslSocketFactorySuccess() {
final SSLSocketFactory sslSocketFactory = SchemaRegistrySSLSocketFactory.createSslSocketFactory(properties);
- assertThat(sslSocketFactory, notNullValue());
+ assertThat(sslSocketFactory).isNotNull();
}
@Test
@@ -46,7 +45,7 @@ public void whenInvalidSSLConfigurationGetSslSocketFactoryFail() {
final SSLSocketFactory sslSocketFactory = SchemaRegistrySSLSocketFactory.createSslSocketFactory(properties);
- assertThat(sslSocketFactory, nullValue());
+ assertThat(sslSocketFactory).isNull();
}
@Test
@@ -56,7 +55,7 @@ public void whenInvalidProviderConfigurationGetSslSocketFactorySuccess() {
final SSLSocketFactory sslSocketFactory = SchemaRegistrySSLSocketFactory.createSslSocketFactory(properties);
- assertThat(sslSocketFactory, notNullValue());
+ assertThat(sslSocketFactory).isNotNull();
}
@Test
@@ -66,7 +65,7 @@ public void whenInvalidKeyPassworConfigurationGetSslSocketFactoryFail() {
final SSLSocketFactory sslSocketFactory = SchemaRegistrySSLSocketFactory.createSslSocketFactory(properties);
- assertThat(sslSocketFactory, nullValue());
+ assertThat(sslSocketFactory).isNull();
}
@Test
@@ -76,7 +75,7 @@ public void whenInvalidKeyPassworIsNullConfigurationGetSslSocketFactorySuccess()
final SSLSocketFactory sslSocketFactory = SchemaRegistrySSLSocketFactory.createSslSocketFactory(properties);
- assertThat(sslSocketFactory, notNullValue());
+ assertThat(sslSocketFactory).isNotNull();
}
@Test
@@ -86,7 +85,7 @@ public void whenInvalidProtocolConfigurationGetSslSocketFactoryFail() {
final SSLSocketFactory sslSocketFactory = SchemaRegistrySSLSocketFactory.createSslSocketFactory(properties);
- assertThat(sslSocketFactory, nullValue());
+ assertThat(sslSocketFactory).isNull();
}
@Test
@@ -96,7 +95,7 @@ public void whenInvalidKeyStoreLocationConfigurationGetSslSocketFactoryFail() {
final SSLSocketFactory sslSocketFactory = SchemaRegistrySSLSocketFactory.createSslSocketFactory(properties);
- assertThat(sslSocketFactory, nullValue());
+ assertThat(sslSocketFactory).isNull();
}
@Test
@@ -106,7 +105,7 @@ public void whenInvalidKeyStorePasswordConfigurationGetSslSocketFactoryFail() {
final SSLSocketFactory sslSocketFactory = SchemaRegistrySSLSocketFactory.createSslSocketFactory(properties);
- assertThat(sslSocketFactory, nullValue());
+ assertThat(sslSocketFactory).isNull();
}
@Test
@@ -116,7 +115,7 @@ public void whenInvalidKeyStoreTypeConfigurationGetSslSocketFactoryFail() {
final SSLSocketFactory sslSocketFactory = SchemaRegistrySSLSocketFactory.createSslSocketFactory(properties);
- assertThat(sslSocketFactory, nullValue());
+ assertThat(sslSocketFactory).isNull();
}
@Test
@@ -126,7 +125,7 @@ public void whenInvalidKeyManagerAlgorithmConfigurationGetSslSocketFactoryFail()
final SSLSocketFactory sslSocketFactory = SchemaRegistrySSLSocketFactory.createSslSocketFactory(properties);
- assertThat(sslSocketFactory, nullValue());
+ assertThat(sslSocketFactory).isNull();
}
@Test
@@ -136,7 +135,7 @@ public void whenNullKeyStoreLocationConfigurationGetSslSocketFactoryFail() {
final SSLSocketFactory sslSocketFactory = SchemaRegistrySSLSocketFactory.createSslSocketFactory(properties);
- assertThat(sslSocketFactory, nullValue());
+ assertThat(sslSocketFactory).isNull();
}
@Test
@@ -146,7 +145,7 @@ public void whenNullKeyStorePasswordConfigurationGetSslSocketFactorySuccess() {
final SSLSocketFactory sslSocketFactory = SchemaRegistrySSLSocketFactory.createSslSocketFactory(properties);
- assertThat(sslSocketFactory, notNullValue());
+ assertThat(sslSocketFactory).isNotNull();
}
@Test
@@ -157,7 +156,7 @@ public void whenNullKeyPasswordAndKeyStorePasswordConfigurationGetSslSocketFacto
final SSLSocketFactory sslSocketFactory = SchemaRegistrySSLSocketFactory.createSslSocketFactory(properties);
- assertThat(sslSocketFactory, nullValue());
+ assertThat(sslSocketFactory).isNull();
}
@Test
@@ -167,7 +166,7 @@ public void whenNullKeyStoreTypeConfigurationGetSslSocketFactoryFail() {
final SSLSocketFactory sslSocketFactory = SchemaRegistrySSLSocketFactory.createSslSocketFactory(properties);
- assertThat(sslSocketFactory, nullValue());
+ assertThat(sslSocketFactory).isNull();
}
@Test
@@ -177,7 +176,7 @@ public void whenNullKeyManagerAlgorithmConfigurationGetSslSocketFactoryFail() {
final SSLSocketFactory sslSocketFactory = SchemaRegistrySSLSocketFactory.createSslSocketFactory(properties);
- assertThat(sslSocketFactory, notNullValue());
+ assertThat(sslSocketFactory).isNotNull();
}
@Test
@@ -187,7 +186,7 @@ public void whenInvalidTrustStoreLocationConfigurationGetSslSocketFactoryFail()
final SSLSocketFactory sslSocketFactory = SchemaRegistrySSLSocketFactory.createSslSocketFactory(properties);
- assertThat(sslSocketFactory, nullValue());
+ assertThat(sslSocketFactory).isNull();
}
@Test
@@ -197,7 +196,7 @@ public void whenInvalidTrustStorePasswordConfigurationGetSslSocketFactoryFail()
final SSLSocketFactory sslSocketFactory = SchemaRegistrySSLSocketFactory.createSslSocketFactory(properties);
- assertThat(sslSocketFactory, nullValue());
+ assertThat(sslSocketFactory).isNull();
}
@Test
@@ -207,7 +206,7 @@ public void whenInvalidTrustStoreTypeConfigurationGetSslSocketFactoryFail() {
final SSLSocketFactory sslSocketFactory = SchemaRegistrySSLSocketFactory.createSslSocketFactory(properties);
- assertThat(sslSocketFactory, nullValue());
+ assertThat(sslSocketFactory).isNull();
}
@Test
@@ -217,7 +216,7 @@ public void whenInvalidTrustManagerAlgorithmConfigurationGetSslSocketFactoryFail
final SSLSocketFactory sslSocketFactory = SchemaRegistrySSLSocketFactory.createSslSocketFactory(properties);
- assertThat(sslSocketFactory, nullValue());
+ assertThat(sslSocketFactory).isNull();
}
@Test
@@ -227,7 +226,7 @@ public void whenNullTrustStoreLocationConfigurationGetSslSocketFactoryFail() {
final SSLSocketFactory sslSocketFactory = SchemaRegistrySSLSocketFactory.createSslSocketFactory(properties);
- assertThat(sslSocketFactory, nullValue());
+ assertThat(sslSocketFactory).isNull();
}
@Test
@@ -237,7 +236,7 @@ public void whenNullTrustStorePasswordConfigurationGetSslSocketFactorySuccess()
final SSLSocketFactory sslSocketFactory = SchemaRegistrySSLSocketFactory.createSslSocketFactory(properties);
- assertThat(sslSocketFactory, notNullValue());
+ assertThat(sslSocketFactory).isNotNull();
}
@Test
@@ -247,7 +246,7 @@ public void whenNullTrustStoreTypeConfigurationGetSslSocketFactoryFail() {
final SSLSocketFactory sslSocketFactory = SchemaRegistrySSLSocketFactory.createSslSocketFactory(properties);
- assertThat(sslSocketFactory, nullValue());
+ assertThat(sslSocketFactory).isNull();
}
@Test
@@ -257,7 +256,7 @@ public void whenNullTrustManagerAlgorithmConfigurationGetSslSocketFactoryFail()
final SSLSocketFactory sslSocketFactory = SchemaRegistrySSLSocketFactory.createSslSocketFactory(properties);
- assertThat(sslSocketFactory, notNullValue());
+ assertThat(sslSocketFactory).isNotNull();
}
}
diff --git a/src/test/java/org/springframework/schemaregistry/deserializer/AvroDeserializerTest.java b/src/test/java/org/springframework/schemaregistry/deserializer/AvroDeserializerTest.java
index fc852bb..f38dd24 100644
--- a/src/test/java/org/springframework/schemaregistry/deserializer/AvroDeserializerTest.java
+++ b/src/test/java/org/springframework/schemaregistry/deserializer/AvroDeserializerTest.java
@@ -1,19 +1,16 @@
package org.springframework.schemaregistry.deserializer;
-import static org.assertj.core.api.Assertions.catchThrowable;
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.nullValue;
-
-import java.io.IOException;
-
+import example.avro.User;
import org.apache.avro.specific.SpecificRecord;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.header.internals.RecordHeaders;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import org.springframework.schemaregistry.serializer.AvroSerializer;
-import example.avro.User;
+import java.io.IOException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
public class AvroDeserializerTest {
@@ -26,7 +23,7 @@ public void testSuccessDeserializer() throws IOException {
final RecordHeaders headers = new RecordHeaders();
final byte[] bs = avroSerializer.serialize("bogus", headers, createAvroRecord());
final SpecificRecord specificRecord = avroDeserializer.deserialize("bogus", headers, bs);
- assertThat(specificRecord, instanceOf(User.class));
+ assertThat(specificRecord).isInstanceOf(User.class);
}
@Test
@@ -34,19 +31,20 @@ public void testSuccessDeserializerNullData() throws IOException {
final RecordHeaders headers = new RecordHeaders();
final byte[] bs = avroSerializer.serialize("bogus", headers, null);
final SpecificRecord specificRecord = avroDeserializer.deserialize("bogus", headers, bs);
- assertThat(specificRecord, nullValue());
+ assertThat(specificRecord).isNull();
}
@Test
public void testSuccessDeserializerRiseSerializationException() throws IOException {
- final Throwable throwable = catchThrowable(() -> avroDeserializer.deserialize("bogus", null, new byte[] { 0 }));
- assertThat(throwable, instanceOf(SerializationException.class));
+ assertThatThrownBy(() -> avroDeserializer.deserialize("bogus", null, new byte[] { 0 }))
+ .isInstanceOf(SerializationException.class);
+
}
@Test
public void testSuccessDeserializerRiseUnsupportedOperationException() throws IOException {
- final Throwable throwable = catchThrowable(() -> avroDeserializer.deserialize("bogus", new byte[] { 0 }));
- assertThat(throwable, instanceOf(UnsupportedOperationException.class));
+ assertThatThrownBy(() -> avroDeserializer.deserialize("bogus", new byte[] { 0 }))
+ .isInstanceOf(UnsupportedOperationException.class);
}
private SpecificRecord createAvroRecord() throws IOException {
diff --git a/src/test/java/org/springframework/schemaregistry/deserializer/GenericKafkaAvroDeserializerTest.java b/src/test/java/org/springframework/schemaregistry/deserializer/GenericKafkaAvroDeserializerTest.java
index edc1251..286fffe 100644
--- a/src/test/java/org/springframework/schemaregistry/deserializer/GenericKafkaAvroDeserializerTest.java
+++ b/src/test/java/org/springframework/schemaregistry/deserializer/GenericKafkaAvroDeserializerTest.java
@@ -1,14 +1,12 @@
package org.springframework.schemaregistry.deserializer;
-import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertThat;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
+import example.avro.User;
+import io.confluent.kafka.schemaregistry.avro.AvroSchema;
+import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
+import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
+import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Parser;
import org.apache.avro.generic.GenericData;
@@ -17,16 +15,19 @@
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import org.springframework.schemaregistry.serializer.SpecificKafkaAvroSerializer;
import org.springframework.util.ResourceUtils;
-import example.avro.User;
-import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
-import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
-import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
-import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
public class GenericKafkaAvroDeserializerTest {
@@ -40,12 +41,12 @@ public class GenericKafkaAvroDeserializerTest {
private final static String TOPIC = "xpto";
- @Before
+ @BeforeEach
public void setUp() {
props = new HashMap<>();
- props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "bogus");
- props.put(AbstractKafkaAvroSerDeConfig.AUTO_REGISTER_SCHEMAS, false);
+ props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "bogus");
+ props.put(AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS, false);
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
final Properties defaultConfig = new Properties();
@@ -80,48 +81,54 @@ private IndexedRecord createKnowAvroRecord() throws IOException {
@Test
public void assertSchemaValidAndAvroLocalExists() throws IOException, RestClientException {
final IndexedRecord avroRecord = createKnowAvroRecord();
- schemaRegistry.register(TOPIC + "-value", avroRecord.getSchema());
+ final AvroSchema avroSchema = new AvroSchema(avroRecord.getSchema());
+ schemaRegistry.register(TOPIC + "-value", avroSchema);
final byte[] bytes = serializer.serialize(TOPIC, avroRecord);
final Object deserialize = deserializer.deserialize(TOPIC, bytes);
- assertThat(deserialize.getClass(), equalTo(User.class));
+
+ assertThat(deserialize.getClass()).isEqualTo(User.class);
}
@Test
public void assertSchemaValidAndAvroLocalExistsAndCached() throws IOException, RestClientException {
final IndexedRecord avroRecord = createKnowAvroRecord();
- schemaRegistry.register(TOPIC + "-value", avroRecord.getSchema());
+ final AvroSchema avroSchema = new AvroSchema(avroRecord.getSchema());
+ schemaRegistry.register(TOPIC + "-value", avroSchema);
final byte[] bytes = serializer.serialize(TOPIC, avroRecord);
deserializer.deserialize(TOPIC, bytes);
final Object deserialize = deserializer.deserialize(TOPIC, bytes);
- assertThat(deserialize.getClass(), equalTo(User.class));
+ assertThat(deserialize.getClass()).isEqualTo(User.class);
}
@Test
public void assertSchemaValidAndAvroLocalDontExists() throws IOException, RestClientException {
final IndexedRecord avroRecord = createUnknowAvroRecord();
- schemaRegistry.register(TOPIC + "-value", avroRecord.getSchema());
+ final AvroSchema avroSchema = new AvroSchema(avroRecord.getSchema());
+ schemaRegistry.register(TOPIC + "-value", avroSchema);
final byte[] bytes = serializer.serialize(TOPIC, avroRecord);
final Object deserialize = deserializer.deserialize(TOPIC, bytes);
- assertThat(deserialize.getClass(), equalTo(GenericData.Record.class));
+ assertThat(deserialize.getClass()).isEqualTo(GenericData.Record.class);
}
@Test
public void assertSchemaValidAndAvroLocalDontExistsAndCached() throws IOException, RestClientException {
final IndexedRecord avroRecord = createUnknowAvroRecord();
- schemaRegistry.register(TOPIC + "-value", avroRecord.getSchema());
+ final AvroSchema avroSchema = new AvroSchema(avroRecord.getSchema());
+ schemaRegistry.register(TOPIC + "-value", avroSchema);
final byte[] bytes = serializer.serialize(TOPIC, avroRecord);
deserializer.deserialize(TOPIC, bytes);
final Object deserialize = deserializer.deserialize(TOPIC, bytes);
- assertThat(deserialize.getClass(), equalTo(GenericData.Record.class));
+ assertThat(deserialize.getClass()).isEqualTo((GenericData.Record.class));
}
- @Test(expected = SerializationException.class)
- public void assertSchemaInvalidAndDataInvalid() throws IOException, RestClientException {
- deserializer.deserialize(TOPIC, new byte[] { 0x10 });
+ @Test
+ public void assertSchemaInvalidAndDataInvalid() {
+ assertThatThrownBy(() -> deserializer.deserialize(TOPIC, new byte[] { 0x10 }))
+ .isInstanceOf(SerializationException.class);
}
}
\ No newline at end of file
diff --git a/src/test/java/org/springframework/schemaregistry/deserializer/SpecificKafkaAvroDeserializerTest.java b/src/test/java/org/springframework/schemaregistry/deserializer/SpecificKafkaAvroDeserializerTest.java
index 6fd8ca3..24a3db9 100644
--- a/src/test/java/org/springframework/schemaregistry/deserializer/SpecificKafkaAvroDeserializerTest.java
+++ b/src/test/java/org/springframework/schemaregistry/deserializer/SpecificKafkaAvroDeserializerTest.java
@@ -1,18 +1,12 @@
package org.springframework.schemaregistry.deserializer;
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.Assert.assertEquals;
-
-import java.io.File;
-import java.io.IOException;
-import java.security.NoSuchAlgorithmException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-
-import javax.net.ssl.SSLContext;
+import example.avro.User;
+import io.confluent.kafka.schemaregistry.avro.AvroSchema;
+import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
+import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
+import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Parser;
import org.apache.avro.generic.GenericData;
@@ -20,17 +14,21 @@
import org.apache.avro.generic.IndexedRecord;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import org.springframework.schemaregistry.core.SslSocketFactoryConfig;
import org.springframework.schemaregistry.serializer.SpecificKafkaAvroSerializer;
import org.springframework.util.ResourceUtils;
-import example.avro.User;
-import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
-import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
-import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
-import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
+import javax.net.ssl.SSLContext;
+import java.io.File;
+import java.io.IOException;
+import java.security.NoSuchAlgorithmException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.assertj.core.api.Assertions.assertThat;
public class SpecificKafkaAvroDeserializerTest {
@@ -44,12 +42,12 @@ public class SpecificKafkaAvroDeserializerTest {
private final static String TOPIC = "xpto";
- @Before
+ @BeforeEach
public void setUp() {
props = new HashMap<>();
- props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "bogus");
- props.put(AbstractKafkaAvroSerDeConfig.AUTO_REGISTER_SCHEMAS, false);
+ props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "bogus");
+ props.put(AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS, false);
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
final Properties defaultConfig = new Properties();
@@ -75,11 +73,12 @@ private IndexedRecord createAvroRecord() throws IOException {
@Test
public void assertSchemaValidAndAvroLocalExists() throws IOException, RestClientException {
final IndexedRecord avroRecord = createAvroRecord();
- schemaRegistry.register(TOPIC + "-value", avroRecord.getSchema());
+ final AvroSchema avroSchema = new AvroSchema(avroRecord.getSchema());
+ schemaRegistry.register(TOPIC + "-value", avroSchema);
final byte[] bytes = serializer.serialize(TOPIC, avroRecord);
final Object deserialize = deserializer.deserialize(TOPIC, bytes);
- assertThat(deserialize.getClass(), equalTo(User.class));
+ assertThat(deserialize.getClass()).isEqualTo(User.class);
}
@Test
@@ -96,24 +95,24 @@ public void testKafkaAvroDeserializerConfigureWithSSL() throws IOException, NoSu
properties.setTrustManagerAlgorithm("SunX509");
properties.setTrustStoreType("JKS");
properties.setProvider(SSLContext.getDefault().getProvider());
- properties.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "bogus");
- properties.put(AbstractKafkaAvroSerDeConfig.AUTO_REGISTER_SCHEMAS, false);
+ properties.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "bogus");
+ properties.put(AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS, false);
try (final Deserializer