티스토리 뷰
BinaryMessageEncoder / BinaryMessageDecoder
예제 코드
public class BinaryMessageEncoderMain {
public static final String TEST_FILE_NAME = "serizlied-object.avro";
public static void main(String[] args) throws IOException {
AvroExampleDto before = AvroExampleDto.newBuilder().setTestNickName("Woongs").setTestTeamName("Platform").setTestName("Sondevv").build();
write(before);
AvroExampleDto after = read();
System.out.println(after);
}
public static void write(AvroExampleDto exampleDto) throws IOException {
BinaryMessageEncoder binaryMessageEncoder = new BinaryMessageEncoder(new SpecificData(), AvroExampleDto.getClassSchema());
ByteBuffer byteBuffer = binaryMessageEncoder.encode(exampleDto);
BufferedWriter writer = new BufferedWriter(new FileWriter(TEST_FILE_NAME));
writer.write(Bytes.toHexString(byteBuffer));
writer.close();
}
public static AvroExampleDto read() throws IOException {
BufferedReader reader = new BufferedReader(new FileReader(TEST_FILE_NAME));
String data = reader.readLine();
AvroExampleSchemaStore exampleSchemaStore = new AvroExampleSchemaStore();
BinaryMessageDecoder<AvroExampleDto> binaryMessageDecoder = new BinaryMessageDecoder<>(new SpecificData(), AvroExampleDto.getClassSchema(), exampleSchemaStore);
return binaryMessageDecoder.decode(Bytes.fromHexString(data));
}
}
class AvroExampleSchemaStore implements SchemaStore {
HashMap<Long, Schema> SCHEMA_MAP = new HashMap<>();
public AvroExampleSchemaStore() {
SCHEMA_MAP.put(SchemaNormalization.parsingFingerprint64(AvroExampleDto.getClassSchema()), AvroExampleDto.getClassSchema());
}
@Override
public Schema findByFingerprint(long fingerprint) {
return SCHEMA_MAP.get(fingerprint);
}
}
직렬화된 데이터
0xc301b1b769888689b0800c576f6f6e677310506c6174666f726d0e536f6e64657676
결과
- BinaryMessageEncoder / BinaryMessage 는 Byte Array 로 객체를 직렬화 한다.
- 설명에서 알 수 있듯이 직렬화된 데이터는 헤더 값을 갖는다. 직렬화된 데이터의 시작부분인 c301 이 Avro 메시지임을 나타낸다.
- c301 뒤로는 직렬화된 Schema 의 Fingerprint 가 저장된다.
- 이는 아래 코드를 실행하여 해당 Schema 의 fingerprint 를 확인해보면 알 수 있다
-
System.out.println(Bytes.toHexString(SchemaNormalization.parsingFingerprint("CRC-64-AVRO", AvroExampleDto.getClassSchema())));
- 위 코드의 결과는 0xb1b769888689b080 와 같다. 즉 직렬화된 데이터의 c301 뒤의 byte 들 중에 b1b769888689b080 까지는 해당 Schema 의 fingerprint 임을 알 수 있다.
- Fingerprint 이후의 byte 들은 실제 데이터를 byte 로 직렬화한 결과이다
- BinaryMessageEncoder 는 SchemaStore 를 전달할 수 있는데 역직렬화시에 SchemaStore 의 findByFingerprint() 함수를 호출하여 직렬화된 데이터에 저장되어있는 fingerprint 기준으로 Schema 를 찾아 writer's schema 로 사용한다.
- SchemaStore 를 다양하게 구현하여 Schema 저장소로 활용할 수 있다.
BinaryEncoder, DatumWriter / BinaryDecoder, DatumReader
예제 코드
public class BinaryEncoder {
public static final String TEST_FILE_NAME = "serizlied-object-binary-encode.avro";
public static void main(String[] args) throws IOException {
AvroExampleDto before = AvroExampleDto.newBuilder().setTestNickName("Woongs").setTestTeamName("Platform").setTestName("Sondevv").build();
write(before);
AvroExampleDto after = read();
System.out.println(after);
}
public static void write(AvroExampleDto exampleDto) throws IOException {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
org.apache.avro.io.BinaryEncoder binaryEncoder = EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null);
DatumWriter<AvroExampleDto> datumWriter = new GenericDatumWriter<>(exampleDto.getSchema());
datumWriter.write(exampleDto, binaryEncoder);
binaryEncoder.flush();
byteArrayOutputStream.close();
ByteBuffer byteBuffer = ByteBuffer.wrap(byteArrayOutputStream.toByteArray());
BufferedWriter writer = new BufferedWriter(new FileWriter(TEST_FILE_NAME));
writer.write(Bytes.toHexString(byteBuffer));
writer.close();
}
public static AvroExampleDto read() throws IOException {
BufferedReader reader = new BufferedReader(new FileReader(TEST_FILE_NAME));
String data = reader.readLine();
DatumReader<AvroExampleDto> datumReader = new SpecificDatumReader<>(AvroExampleDto.getClassSchema(), AvroExampleDto.getClassSchema());
Decoder decoder = DecoderFactory.get().binaryDecoder(Bytes.fromHexString(data).array(), null);
return datumReader.read(null, decoder);
}
}
직렬화된 데이터
0x0c576f6f6e677310506c6174666f726d0e536f6e64657676
결과
- BinaryEncoder 는 직렬화된 데이터에 Schema 정보와 fingerprint 를 전달하지 않는다. 따라서 역직렬화할때 DatumReader 에게 reader's schema 와 writer's schema 를 전달해야한다.
- BinaryMessageEncoder 와 비교했을때 헤더 정보와 fingerprint 정보가 데이터에 쓰여지지 않기 때문에 데이터 길이가 짧다
- BinaryMessageEncoder 의 직렬화 결과와 비교했을때 fingerprint 이후의 데이터는 모두 일치하는 것을 볼 수 있으므로 데이터 부분만 직렬화되어 저장된 것을 볼 수 있다.
- writer's schema 와 reader's schema 가 다를 경우에 각각을 DatumReader 에게 전달하지 않으면 default 가 정상적으로 동작하지 않는다. 따라서 두 schema 가 다를 가능성이 있다면 (배포 상황에서 대부분 다를 수 있다) Write 할 당시의 Schema 정보를 함께 저장했다가 Read 시에 DatumReader 에게 전달해야한다.
- Version1 Schema 로 만들어진 데이터를 Version2 Schema 로 읽어오려할 경우에는 예외가 발생한다
-
java.io.EOFException at org.apache.avro.io.BinaryDecoder.ensureBounds(BinaryDecoder.java:542) at org.apache.avro.io.BinaryDecoder.readLong(BinaryDecoder.java:205) at org.apache.avro.io.ResolvingDecoder.readLong(ResolvingDecoder.java:169) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:198) at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:136) at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248) at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154) at com.coupang.Main.lambda$readBinaryDecoder$1(Main.java:143) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
-
- Version1 으로 Generation 된 Class 로 Version2 Schema 로 직렬화된 데이터를 불러올 수 있는지 확인
- 배포시에 New 서버에서는 Version2 Schema 로 저장을 하고 Old 서버는 Version1 으로 Generation 된 코드로 구동중이므로 발생할 수 있는 상황
- Version2 에서 새로 추가된 Field 는 무시되면서 역직렬화 성공 : Schema Resolution 에 설명되어있음.
- Version2 로 Generation 된 Class 로 Version1 Schema 로 직렬화된 데이터를 불러올 수 있는지 확인
- 배포시에 Old 서버는 Version1 Schema 로 데이터를 계속 직렬화하여 저장하고 있고 New 서버는 Version2 로 Generation 된 코드로 역직렬화를 시도하므로 발생할 수 있는 상황
- 문제 없이 동장하며 새로운 필드는 default 값이 채워진다. : Schema Resolution 에 설명되어있음. 단. DatumReader 에게 reader's schema 와 writer's schema 를 정상적으로 전달해야 default 값 정상 동작.
JsonEncooder, DatumWriter / JsonDecoder, DatumReader
예제코드
public class JsonEncoder {
public static final String TEST_FILE_NAME = "serizlied-object-json-encode.avro";
public static void main(String[] args) throws IOException {
AvroExampleDto before = AvroExampleDto.newBuilder().setTestNickName("Woongs").setTestTeamName("Platform").setTestName("Sondevv").build();
write(before);
AvroExampleDto after = read();
System.out.println(after);
}
public static void write(AvroExampleDto exampleDto) throws IOException {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
org.apache.avro.io.JsonEncoder jsonEncoder = EncoderFactory.get().jsonEncoder(AvroExampleDto.getClassSchema(), byteArrayOutputStream);
DatumWriter<AvroExampleDto> datumWriter = new GenericDatumWriter<>(exampleDto.getSchema());
datumWriter.write(exampleDto, jsonEncoder);
jsonEncoder.flush();
BufferedWriter writer = new BufferedWriter(new FileWriter(TEST_FILE_NAME));
writer.write(byteArrayOutputStream.toString());
writer.close();
byteArrayOutputStream.close();
}
public static AvroExampleDto read() throws IOException {
BufferedReader reader = new BufferedReader(new FileReader(TEST_FILE_NAME));
String data = reader.readLine();
DatumReader<AvroExampleDto> datumReader = new SpecificDatumReader<>(AvroExampleDto.getClassSchema(), AvroExampleDto.getClassSchema());
Decoder jsonDecoder = DecoderFactory.get().jsonDecoder(AvroExampleDto.getClassSchema(), data);
return datumReader.read(null, jsonDecoder);
}
}
직렬화된 데이터
{"testNickName":"Woongs","testTeamName":"Platform","testName":"Sondevv"}
결과
- BinaryEncoder 와 마찬가지로 Schema 의 정보를 직렬화 데이터에 포함하지 않기 때문에 DatumReader / DatumWriter 에게 Schema 정보를 전달해야하며 DaturmReader 에게는 reader's schema, writer's schema 를 전달해야한다.
- BinaryEncoder 와는 달리 JsonEncoder 에게도 Schema 를 전달해야하며 이는 Json 을 Parsing 할때 사용되기 때문에 writer's schema 를 전달해야한다. reader's schema 를 전달할 경우에 파싱할 수 없는 토큰이 Json 내에 포함되어 아래 예외 발생
-
java.io.EOFException at org.apache.avro.io.JsonDecoder.advance(JsonDecoder.java:131) at org.apache.avro.io.JsonDecoder.skipString(JsonDecoder.java:230) at org.apache.avro.io.ResolvingDecoder.skipString(ResolvingDecoder.java:230) at org.apache.avro.io.ParsingDecoder.skipTopSymbol(ParsingDecoder.java:61) at org.apache.avro.io.parsing.SkipParser.skipTo(SkipParser.java:65) at org.apache.avro.io.parsing.SkipParser.skipSymbol(SkipParser.java:88) at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:303) at org.apache.avro.io.parsing.Parser.processImplicitActions(Parser.java:112) at org.apache.avro.io.ResolvingDecoder.drain(ResolvingDecoder.java:157) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155) at com.coupang.Main.lambda$readJsonDecoder$2(Main.java:240) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
-
- Binary Encoder 와는 달리 눈으로 확인할 수 있는 데이터가 저장되기 때문에 디버깅에 용이하지만 Binary 보다 성능이 떨어진다고 공식 문서에 작성되어있음
- Version1 으로 Generation 된 Class 로 Version2 Schema 로 직렬화된 데이터를 불러올 수 있는지 확인
- 배포시에 New 서버에서는 Version2 Schema 로 저장을 하고 Old 서버는 Version1 으로 Generation 된 코드로 구동중이므로 발생할 수 있는 상황
- Version2 에서 새로 추가된 Field 는 무시되면서 역직렬화 성공 : Schema Resolution 에 설명되어있음.
- Version2 로 Generation 된 Class 로 Version1 Schema 로 직렬화된 데이터를 불러올 수 있는지 확인
- 배포시에 Old 서버는 Version1 Schema 로 데이터를 계속 직렬화하여 저장하고 있고 New 서버는 Version2 로 Generation 된 코드로 역직렬화를 시도하므로 발생할 수 있는 상황
- 문제 없이 동장하며 새로운 필드는 default 값이 채워진다. : Schema Resolution 에 설명되어있음. 단. DatumReader 에게 reader's schema 와 writer's schema 를 정상적으로 전달해야 default 값 정상 동작.
'Avro' 카테고리의 다른 글
Apache Avro - 설명 (0) | 2022.02.17 |
---|---|
Avro 시작하기 - Java, Gradle (0) | 2022.02.17 |
Apache Avro 소개 (0) | 2022.02.17 |
공지사항
최근에 올라온 글
최근에 달린 댓글
- Total
- Today
- Yesterday
링크
TAG
- notify()
- Seperate Chaining
- spring cloud gateway
- getBoolean
- AbstractMethodError
- msyql-connector-java
- DyanomoDB
- GlobalFilter
- router
- referencedColumnName
- wait()
- HashMap
- N+1
- RouteDefinition
- circurit breaker
- reative
- MariaDB
- ResultSet
- custom config data convertion
- mariada-connector
- notifyAll()
- mariadb-connector-j
- ConcurrentHashMap
- Flux
- aurora
- RoutePredication
- reactor
- rate limit
- Lazy
- dynamodb
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 | 31 |
글 보관함