mariadb-connector-j Aurora read/write seperate 과정
https://github.com/mariadb-corporation/mariadb-connector-j
https://mariadb.com/kb/en/failover-and-high-availability-with-mariadb-connector-j/#specifics-for-amazon-aurora
mariadb-connector-j 를 사용하여 Aurora DB 로 connection 을 맺게 되면 Transaction 의 readOnly 값에 따라서 read 와 write 가 서로 다른 DNS 로 전달되게 된다. 그 과정을 코드 레벨로 확인한다.
0. read/write seperate 동작 확인
Cluster(Writer) : dev56-a.cluster
Cluster(Reader) : dev56-a.cluster-ro
Writer : dev5601
Reader : dev5602
Aurora 클러스터는 위와 같이 구성되어있다.
Writer, Reader DNS 가 각각 생성되어있으며 Writer 한대 (5601) Reader 한대 (5602) 로 구성.
테스트코드
DruidPooledConnection connection = druidDataSource.getConnection();
Statement stmt = connection.createStatement();
connection.setReadOnly(false);
stmt.execute("SELECT 1 /*readOnly=false*/;");
connection.setReadOnly(true);
connection.createStatement().execute("SELECT 1 /*readOnly=true*/;");
readOnly:false 일 경우 Socket 정보
readOnly:true 일 경우 Socket 정보
Connection 의 ReadOnly 값에 따라서 Writer Endpoint(dev56) 을 사용할지 Reader Endpoint(dev5602) 를 사용할지 결정되고있다.
Read/Write 가 분리되는 과정
1. AuroraProtocol 과 AuroraListener 생성
1.1 AuroraProtocol 생성 및 FailoverProxy 세팅.
switch (urlParser.getHaMode()) {
case AURORA:
AuroraListener auroraListener;
if (urlParser.isAuroraDualRead()) {
auroraListener = new AuroraDualReadListener(urlParser, globalInfo);
} else {
auroraListener = new AuroraListener(urlParser, globalInfo);
}
return getProxyLoggingIfNeeded(
urlParser,
(Protocol)
Proxy.newProxyInstance(
AuroraProtocol.class.getClassLoader(),
new Class[] {Protocol.class},
new FailoverProxy(auroraListener, lock, traceCache)));
mariadb-connector-j 의 newConnection() 메서드가 호출이되면 HaMode 에 따라서 Protocol 이 생성된다. 이 Protocol 은 connector 와 DB 가 통신할때 사용되는 클래스이다.
HaMode.Aurora 인 경우 AuroraProtocol.java 이 사용된다.
또한 AuroroaProtocol 의 Proxy 로 FailoverPorxy 가 사용되는 것을 확인할 수 있고 이때 Listener 는 AuroraListener 가 사용된다.
즉, AOP 와 비슷한 동작을 하게 되는데 AuroraProtocol 의 특정 method 가 불리기 직전에 FailoverProxy 를 먼저 실행 시킬 수 있다.
1.2 Writer endpoint 를 이용해서 Reader endpoint 가져오기
protocol.setHostAddress(host);
protocol.connect();
FailerProxy.java 의 생성자에서 AuroraListener 를 init 하게 된다. 이때 loop() 함수가 호출이 된다. 이 과정에서 mariadb-connector 로 전달한 Wrtier endpoint 로 connect() 가 일어난다.
if (listener.isMasterHostFailReconnect() && protocol.isMasterConnection()) {
// Look for secondary when only known endpoint is the cluster endpoint
if (searchFilter.isFineIfFoundOnlyMaster()
&& listener.getUrlParser().getHostAddresses().size() <= 1
&& protocol.getHostAddress().equals(listener.getClusterHostAddress())) {
listener.retrieveAllEndpointsAndSet(protocol);
if (listener.getUrlParser().getHostAddresses().size() > 1) {
// add newly discovered end-point to loop
loopAddresses.addAll(listener.getUrlParser().getHostAddresses());
// since there is more than one end point, reactivate connection to a read-only host
searchFilter = new SearchFilter(false);
}
}
if (foundMaster(listener, protocol, searchFilter)) {
return;
}
}
Writer 로 connection 에 성공하게 되면 Master connection 일 경우에 위 로직을 타게 된다.
위 조건문에서 listener.retrieveAllEndpoinsAndSet(protocol); 함수의 내부구현을 보자.
public void retrieveAllEndpointsAndSet(Protocol protocol) throws SQLException {
// For a given cluster, same port for all endpoints and same end host address
if (clusterDnsSuffix != null) {
List<String> endpoints = getCurrentEndpointIdentifiers(protocol);
setUrlParserFromEndpoints(endpoints, protocol.getPort());
}
}
clusterDnsSuffix 는 Wrtier endpoint 에서 cluster 공통 부분을 추출한 suffix 이다.
getCurrentEndpointIdentifiers(protocol) 에서 Reader Instance 들의 endpoint 가 리턴된다.
protocol.executeQuery(
false,
results,
"select server_id, session_id from information_schema.replica_host_status "
+ "where last_update_timestamp > now() - INTERVAL 3 MINUTE");
results.commandEnd();
ResultSet resultSet = results.getResultSet();
while (resultSet.next()) {
endpoints.add(resultSet.getString(1) + "." + clusterDnsSuffix);
}
해당 함수의 핵심 부분만 확인하면 아까 protocol 에서 master 로 connection 에 성공했기 때문에 위와 같은 쿼리를 날리게 된다.
쿼리의 결과로 해당 클러스터의 인스턴스들의 server_id 가 리턴된다.
이 server_id 들에 clusterDnsSuffix 를 붙이게 된다.
1.3 Reader Endpoint 를 사용해 만든 protocol 을 secondaryProtocol 로 저장
public void lockAndSwitchSecondary(Protocol newSecondaryProtocol) throws SQLException {
...
// set new found connection as slave connection.
this.secondaryProtocol = newSecondaryProtocol;
if (urlParser.getOptions().assureReadOnly) {
setSessionReadOnly(true, this.secondaryProtocol);
}
resetSecondaryFailoverData();
}
위에서 얻어낸 Reader Endpoint 는 MasterSlaveListener.java 의 secondaryProtocol 로 저장된다. 당연히 Writer Endpoint 는 primaryProtocol 로 저장되어있다.
FYI) mariadb-connector-j 의 Protocol 과 Listener 의 상속관계는 아래와 같아서 MasterSlaveListener.java 에 저장된 primary, secondary protocol 은 이것을 상속받은 AuroraListener 에서 사용될 수 있다.
여기 까지 실행되면 쿼리 수행에 필요한 클래스및 커넥션이 모두 만들어진 상태가 된다.
쿼리를 실행해서 readOnly 에 따른 커넥션이 어떻게 달라지는지 확인해보자.
2. readOnly=false 인 상태로 쿼리 실행
stmt.execute("SELECT 1 /*readOnly=false*/;")
readOnly 가 false 인 상태로 Statement.execute() 를 호출해본다.
2.1. ConnectionPool 에서 MariaDbStatement 의 execute() 를 호출해준다.
@Override
public final boolean execute(String sql) throws SQLException {
...
try {
return stmt.execute(sql);
}
...
}
테스트 코드에서는 DruidCP 를 사용했기 때문에 DruidCP 에서 MariaDbStatement 의 execute() 가 호출됨.
2.2 MariaDbStatement 에서 protocol.executeQuery() 호출
private boolean executeInternal(String sql, int fetchSize, int autoGeneratedKeys)
throws SQLException {
...
protocol.executeQuery(
protocol.isMasterConnection(), results, getTimeoutSql(nativeSql(sql, protocol)));
...
}
이때 AuroraProtocol.java 에는 FailoverProxy.java 가 Proxy 로 걸려있기 때문에 FailoverProxy 의 invoke() 가 호출된다.
FailoverProxy.java 에는 AuroraProtocol.java 의 각 함수들이 호출되었을때 수행되어야하는 프록시 로직들이 swtich case 문으로 정의 되어있다.
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
switch (methodName) {
case METHOD_GET_LOCK:
return this.lock;
case METHOD_GET_NO_BACKSLASH:
return listener.noBackslashEscapes();
case METHOD_IS_MARIADB:
return listener.isServerMariaDb();
case METHOD_GET_CATALOG:
return listener.getCatalog();
case METHOD_GET_TIMEOUT:
return listener.getTimeout();
...
대부분 Listener 를 먼저 호출해준다.
@Override
public Object invoke(Method method, Object[] args) throws Throwable {
....
return method.invoke(currentProtocol, args);
}
MasterSlaveListener.java 에서 currentProtocol 이 masterProtocol 과 주소가 일치하며 currentProtocol.currentHost 가 dev56-a 이므로 Writer endpoint 가 사용되고있음을 알 수 있다.
MasterSlaveListener.java 에서 currentProtocol.executeQuery() 함수를 invoke 하였으므로 AbstarctQueryProtocol.java 의 executeQuery() 가 실행되었고 여기에서 실제 네트워크 작업이 실행된다.
@Override
public void executeQuery(boolean mustExecuteOnMaster, Results results, final String sql)
throws SQLException {
cmdPrologue();
try {
writer.startPacket(0);
writer.write(COM_QUERY);
writer.write(sql);
writer.flush();
getResult(results);
}
...
}
3. connection 의 readOnly=true 로 세팅
connection.setReadOnly(true);
위 테스트 코드를 통하여 readOnly=true 로 변경
public void setReadOnly(final boolean readOnly) throws SQLException {
...
protocol.setReadonly(readOnly);
...
}
connection 의 setReadOnly() 가 호출되면 MariaDbConnection.java 의 setReadOnly() 가 호출되고 이 함수 내부에서 protocol.setReadOnly() 가 호출된다.
Protocol 의 함수가 호출되면 어김없이 프록시인 FailoverProxy.java 의 invoke() 함수가 호출된다.
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
switch (methodName) {
....
case METHOD_SET_READ_ONLY:
this.listener.switchReadOnlyConnection((Boolean) args[0]);
...
FailoverPorxy 에서 setReadOnly 함수가 호출되면 listener.switchReadOnlyConnection() 를 호출해주게 된다.
이로 인해 MasterSlaveListener 의 아래 함수가 호출된다.
@Override
public void switchReadOnlyConnection(Boolean mustBeReadOnly) throws SQLException {
checkWaitingConnection();
if (currentReadOnlyAsked != mustBeReadOnly) {
proxy.lock.lock();
try {
currentReadOnlyAsked = mustBeReadOnly;
if (currentReadOnlyAsked) {
if (currentProtocol == null) {
// switching to secondary connection
currentProtocol = this.secondaryProtocol;
} else if (currentProtocol.isMasterConnection()) {
// must change to replica connection
if (!isSecondaryHostFail()) {
try {
// switching to secondary connection
syncConnection(this.masterProtocol, this.secondaryProtocol);
currentProtocol = this.secondaryProtocol;
// current connection is now secondary
return;
currentReadOnlyAsked 가 ture 이며 currentProtocol 이 MasterConnection 이므로 currentProtocol 를 secondayProtocol 로 세팅하게된다.
SecondayProtocol 에는 1번에서 확인한대로 Reader Machine DNS 가 저장되어있다.
4. readOnly=true 상태로 쿼리 실행
connection.createStatement().execute("SELECT 1 /*readOnly=true*/;");
readOnly=true 로 변경된 상태에서 위 쿼리를 실행해본다.
모든 과정이 2번과 동일하다.
다만 MasterSlaveListener.java 에서 invoke() 함수가 호출되었을때 this.currentProtocol 이 secondaryProtocol 로 변경되어있다.
currentProtocol 의 주소가 secondaryProtocol 과 동일하며 currentProtocol의 host 가 dev5602 이므로 Reader 로 쿼리가 호출되는 것을 확인할 수 있다.
결론
- mariadb-connector-j 는 aurora 모드일때 Writer endpoint 만 있으면 query 를 통해 나머지 클러스터 정보를 얻어올 수 있다.
- 각 모드별로 사용되는 Proxy 가 있으며 Aurora Mode 일때는 FailoverProxy 가 사용되어 Protocol 이 호출되기 전에 Listener 함수를 호출한다.
- MasterSlaveListener 에는 currentProtocol 를 맴버 변수로 가지고 있으며 currentProtocol 을 통해 DB 와 통신한다.
- default 로 currentProtocol 은 MasterProtocol 이다.
- connection.setReadOnly() 호출되면 currentProtocol 이 secondaryProtocol 로 변경된다.
Writer Endpoint 만 있으면 Aurora 클러스터 전체의 Instance 를 바라볼 수 있는 것이 흥미로워 코드를 분석하기 시작했다.
Reflection 의 Proxy 기능이 실제로 사용되는 부분이 인상깊었다.