티스토리 뷰

Avro

Avro - Serializer/Deserializer 비교

소농배 2022. 2. 18. 16:03

BinaryMessageEncoder / BinaryMessageDecoder

예제 코드

public class BinaryMessageEncoderMain {

	public static final String TEST_FILE_NAME = "serizlied-object.avro";
	public static void main(String[] args) throws IOException {
		AvroExampleDto before = AvroExampleDto.newBuilder().setTestNickName("Woongs").setTestTeamName("Platform").setTestName("Sondevv").build();
		write(before);

		AvroExampleDto after = read();
		System.out.println(after);
	}

	public static void write(AvroExampleDto exampleDto) throws IOException {
		BinaryMessageEncoder binaryMessageEncoder = new BinaryMessageEncoder(new SpecificData(), AvroExampleDto.getClassSchema());
		ByteBuffer byteBuffer = binaryMessageEncoder.encode(exampleDto);
		BufferedWriter writer = new BufferedWriter(new FileWriter(TEST_FILE_NAME));
		writer.write(Bytes.toHexString(byteBuffer));
		writer.close();
	}

	public static AvroExampleDto read() throws IOException {
		BufferedReader reader = new BufferedReader(new FileReader(TEST_FILE_NAME));
		String data = reader.readLine();
		AvroExampleSchemaStore exampleSchemaStore = new AvroExampleSchemaStore();
		BinaryMessageDecoder<AvroExampleDto> binaryMessageDecoder = new BinaryMessageDecoder<>(new SpecificData(), AvroExampleDto.getClassSchema(), exampleSchemaStore);
		return binaryMessageDecoder.decode(Bytes.fromHexString(data));
	}

}

class AvroExampleSchemaStore implements SchemaStore {

	HashMap<Long, Schema> SCHEMA_MAP = new HashMap<>();
	public AvroExampleSchemaStore() {
		SCHEMA_MAP.put(SchemaNormalization.parsingFingerprint64(AvroExampleDto.getClassSchema()), AvroExampleDto.getClassSchema());
	}
	@Override
	public Schema findByFingerprint(long fingerprint) {
		return SCHEMA_MAP.get(fingerprint);
	}
}

 

직렬화된 데이터

0xc301b1b769888689b0800c576f6f6e677310506c6174666f726d0e536f6e64657676

결과

  • BinaryMessageEncoder / BinaryMessage 는 Byte Array 로 객체를 직렬화 한다.
  • 설명에서 알 수 있듯이 직렬화된 데이터는 헤더 값을 갖는다. 직렬화된 데이터의 시작부분인 c301 이 Avro 메시지임을 나타낸다.
  • c301 뒤로는 직렬화된 Schema 의 Fingerprint 가 저장된다.
    • 이는 아래 코드를 실행하여 해당 Schema 의 fingerprint 를 확인해보면 알 수 있다
    • System.out.println(Bytes.toHexString(SchemaNormalization.parsingFingerprint("CRC-64-AVRO", AvroExampleDto.getClassSchema())));
    • 위 코드의 결과는 0xb1b769888689b080 와 같다. 즉 직렬화된 데이터의 c301 뒤의 byte 들 중에 b1b769888689b080 까지는 해당 Schema 의 fingerprint 임을 알 수 있다.
  • Fingerprint 이후의 byte 들은 실제 데이터를 byte 로 직렬화한 결과이다
  • BinaryMessageEncoder 는 SchemaStore 를 전달할 수 있는데 역직렬화시에 SchemaStore 의 findByFingerprint() 함수를 호출하여 직렬화된 데이터에 저장되어있는 fingerprint 기준으로 Schema 를 찾아 writer's schema 로 사용한다.
  • SchemaStore 를 다양하게 구현하여 Schema 저장소로 활용할 수 있다.

 

BinaryEncoder, DatumWriter / BinaryDecoder, DatumReader

예제 코드

public class BinaryEncoder {

	public static final String TEST_FILE_NAME = "serizlied-object-binary-encode.avro";

	public static void main(String[] args) throws IOException {
		AvroExampleDto before = AvroExampleDto.newBuilder().setTestNickName("Woongs").setTestTeamName("Platform").setTestName("Sondevv").build();
		write(before);

		AvroExampleDto after = read();
		System.out.println(after);
	}

	public static void write(AvroExampleDto exampleDto) throws IOException {
		ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
		org.apache.avro.io.BinaryEncoder binaryEncoder = EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null);
		DatumWriter<AvroExampleDto> datumWriter = new GenericDatumWriter<>(exampleDto.getSchema());
		datumWriter.write(exampleDto, binaryEncoder);
		binaryEncoder.flush();
		byteArrayOutputStream.close();
		ByteBuffer byteBuffer = ByteBuffer.wrap(byteArrayOutputStream.toByteArray());
		BufferedWriter writer = new BufferedWriter(new FileWriter(TEST_FILE_NAME));
		writer.write(Bytes.toHexString(byteBuffer));
		writer.close();
	}

	public static AvroExampleDto read() throws IOException {
		BufferedReader reader = new BufferedReader(new FileReader(TEST_FILE_NAME));
		String data = reader.readLine();
		DatumReader<AvroExampleDto> datumReader = new SpecificDatumReader<>(AvroExampleDto.getClassSchema(), AvroExampleDto.getClassSchema());
		Decoder decoder = DecoderFactory.get().binaryDecoder(Bytes.fromHexString(data).array(), null);
		return datumReader.read(null, decoder);
	}
}

직렬화된 데이터

0x0c576f6f6e677310506c6174666f726d0e536f6e64657676

 

결과

  • BinaryEncoder 는 직렬화된 데이터에 Schema 정보와 fingerprint 를 전달하지 않는다. 따라서 역직렬화할때 DatumReader 에게 reader's schema 와 writer's schema 를 전달해야한다.
  • BinaryMessageEncoder 와 비교했을때 헤더 정보와 fingerprint 정보가 데이터에 쓰여지지 않기 때문에 데이터 길이가 짧다
  • BinaryMessageEncoder 의 직렬화 결과와 비교했을때 fingerprint 이후의 데이터는 모두 일치하는 것을 볼 수 있으므로 데이터 부분만 직렬화되어 저장된 것을 볼 수 있다.
  •  writer's schema 와 reader's schema 가 다를 경우에 각각을 DatumReader 에게 전달하지 않으면 default 가 정상적으로 동작하지 않는다. 따라서 두 schema 가 다를 가능성이 있다면 (배포 상황에서 대부분 다를 수 있다) Write 할 당시의 Schema 정보를 함께 저장했다가 Read 시에 DatumReader 에게 전달해야한다.
  • Version1 Schema 로 만들어진 데이터를 Version2 Schema 로 읽어오려할 경우에는 예외가 발생한다
    • java.io.EOFException
          at org.apache.avro.io.BinaryDecoder.ensureBounds(BinaryDecoder.java:542)
          at org.apache.avro.io.BinaryDecoder.readLong(BinaryDecoder.java:205)
          at org.apache.avro.io.ResolvingDecoder.readLong(ResolvingDecoder.java:169)
          at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:198)
          at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:136)
          at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248)
          at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
          at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180)
          at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
          at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154)
          at com.coupang.Main.lambda$readBinaryDecoder$1(Main.java:143)
          at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
          at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
          at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
          at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
          at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
          at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
          at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
  • Version1 으로 Generation 된 Class 로 Version2 Schema 로 직렬화된 데이터를 불러올 수 있는지 확인
    • 배포시에 New 서버에서는 Version2 Schema 로 저장을 하고 Old 서버는 Version1 으로 Generation 된 코드로 구동중이므로 발생할 수 있는 상황
    • Version2 에서 새로 추가된 Field 는 무시되면서 역직렬화 성공 : Schema Resolution 에 설명되어있음.
  • Version2 로 Generation 된 Class 로 Version1 Schema 로 직렬화된 데이터를 불러올 수 있는지 확인
    • 배포시에 Old 서버는 Version1 Schema 로 데이터를 계속 직렬화하여 저장하고 있고 New 서버는 Version2 로 Generation 된 코드로 역직렬화를 시도하므로 발생할 수 있는 상황
    • 문제 없이 동장하며 새로운 필드는 default 값이 채워진다. : Schema Resolution 에 설명되어있음. 단. DatumReader 에게 reader's schema 와 writer's schema 를 정상적으로 전달해야 default 값 정상 동작.

JsonEncooder, DatumWriter / JsonDecoder, DatumReader

예제코드

public class JsonEncoder {

	public static final String TEST_FILE_NAME = "serizlied-object-json-encode.avro";

	public static void main(String[] args) throws IOException {
		AvroExampleDto before = AvroExampleDto.newBuilder().setTestNickName("Woongs").setTestTeamName("Platform").setTestName("Sondevv").build();
		write(before);

		AvroExampleDto after = read();
		System.out.println(after);
	}

	public static void write(AvroExampleDto exampleDto) throws IOException {
		ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
		org.apache.avro.io.JsonEncoder jsonEncoder = EncoderFactory.get().jsonEncoder(AvroExampleDto.getClassSchema(), byteArrayOutputStream);
		DatumWriter<AvroExampleDto> datumWriter = new GenericDatumWriter<>(exampleDto.getSchema());
		datumWriter.write(exampleDto, jsonEncoder);
		jsonEncoder.flush();
		BufferedWriter writer = new BufferedWriter(new FileWriter(TEST_FILE_NAME));
		writer.write(byteArrayOutputStream.toString());
		writer.close();
		byteArrayOutputStream.close();

	}

	public static AvroExampleDto read() throws IOException {
		BufferedReader reader = new BufferedReader(new FileReader(TEST_FILE_NAME));
		String data = reader.readLine();
		DatumReader<AvroExampleDto> datumReader = new SpecificDatumReader<>(AvroExampleDto.getClassSchema(), AvroExampleDto.getClassSchema());
		Decoder jsonDecoder = DecoderFactory.get().jsonDecoder(AvroExampleDto.getClassSchema(), data);
		return datumReader.read(null, jsonDecoder);
	}
}

직렬화된 데이터

{"testNickName":"Woongs","testTeamName":"Platform","testName":"Sondevv"}

결과

  • BinaryEncoder 와 마찬가지로 Schema 의 정보를 직렬화 데이터에 포함하지 않기 때문에 DatumReader / DatumWriter 에게 Schema 정보를 전달해야하며 DaturmReader 에게는 reader's schema, writer's schema 를 전달해야한다.
  • BinaryEncoder 와는 달리 JsonEncoder 에게도 Schema 를 전달해야하며 이는 Json 을 Parsing 할때 사용되기 때문에 writer's schema 를 전달해야한다. reader's schema 를 전달할 경우에 파싱할 수 없는 토큰이 Json 내에 포함되어 아래 예외 발생
    • java.io.EOFException
          at org.apache.avro.io.JsonDecoder.advance(JsonDecoder.java:131)
          at org.apache.avro.io.JsonDecoder.skipString(JsonDecoder.java:230)
          at org.apache.avro.io.ResolvingDecoder.skipString(ResolvingDecoder.java:230)
          at org.apache.avro.io.ParsingDecoder.skipTopSymbol(ParsingDecoder.java:61)
          at org.apache.avro.io.parsing.SkipParser.skipTo(SkipParser.java:65)
          at org.apache.avro.io.parsing.SkipParser.skipSymbol(SkipParser.java:88)
          at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:303)
          at org.apache.avro.io.parsing.Parser.processImplicitActions(Parser.java:112)
          at org.apache.avro.io.ResolvingDecoder.drain(ResolvingDecoder.java:157)
          at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155)
          at com.coupang.Main.lambda$readJsonDecoder$2(Main.java:240)
          at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
          at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
          at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
          at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
          at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
          at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
          at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
  • Binary Encoder 와는 달리 눈으로 확인할 수 있는 데이터가 저장되기 때문에 디버깅에 용이하지만 Binary 보다 성능이 떨어진다고 공식 문서에 작성되어있음
  • Version1 으로 Generation 된 Class 로 Version2 Schema 로 직렬화된 데이터를 불러올 수 있는지 확인
    • 배포시에 New 서버에서는 Version2 Schema 로 저장을 하고 Old 서버는 Version1 으로 Generation 된 코드로 구동중이므로 발생할 수 있는 상황
    • Version2 에서 새로 추가된 Field 는 무시되면서 역직렬화 성공 : Schema Resolution 에 설명되어있음.
  • Version2 로 Generation 된 Class 로 Version1 Schema 로 직렬화된 데이터를 불러올 수 있는지 확인
    • 배포시에 Old 서버는 Version1 Schema 로 데이터를 계속 직렬화하여 저장하고 있고 New 서버는 Version2 로 Generation 된 코드로 역직렬화를 시도하므로 발생할 수 있는 상황
    • 문제 없이 동장하며 새로운 필드는 default 값이 채워진다. : Schema Resolution 에 설명되어있음. 단. DatumReader 에게 reader's schema 와 writer's schema 를 정상적으로 전달해야 default 값 정상 동작.

 

'Avro' 카테고리의 다른 글

Apache Avro - 설명  (0) 2022.02.17
Avro 시작하기 - Java, Gradle  (0) 2022.02.17
Apache Avro 소개  (0) 2022.02.17
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2024/11   »
1 2
3 4 5 6 7 8 9
10 11 12 13 14 15 16
17 18 19 20 21 22 23
24 25 26 27 28 29 30
글 보관함