Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;

import static com.google.common.collect.Maps.newHashMap;
Expand Down Expand Up @@ -52,10 +54,13 @@
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.events.AddPartitionEvent;
import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent;
import org.apache.hadoop.hive.metastore.events.AlterTableEvent;
import org.apache.hadoop.hive.metastore.events.CreateDatabaseEvent;
import org.apache.hadoop.hive.metastore.events.CreateTableEvent;
import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent;
import org.apache.hadoop.hive.metastore.events.DropPartitionEvent;
import org.apache.hadoop.hive.metastore.events.DropTableEvent;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -80,9 +85,11 @@
import com.amazonaws.services.glue.model.GetDatabaseResult;
import com.amazonaws.services.glue.model.GetPartitionsResult;
import com.amazonaws.services.glue.model.InvalidInputException;
import com.amazonaws.services.glue.model.OperationTimeoutException;
import com.amazonaws.services.glue.model.PartitionInput;
import com.amazonaws.services.glue.model.TableInput;
import com.amazonaws.services.glue.model.UpdateDatabaseRequest;
import com.amazonaws.services.glue.model.UpdatePartitionRequest;
import com.amazonaws.services.glue.model.UpdateTableRequest;
import com.google.common.collect.ImmutableMap;

Expand Down Expand Up @@ -118,6 +125,8 @@ public class ApiaryGlueSyncTest {
private ArgumentCaptor<DeleteDatabaseRequest> deleteDatabaseRequestCaptor;
@Captor
private ArgumentCaptor<CreatePartitionRequest> createPartitionRequestCaptor;
@Captor
private ArgumentCaptor<UpdatePartitionRequest> updatePartitionRequestCaptor;

private final String tableName = "some_table";
private final String dbName = "some_db";
Expand Down Expand Up @@ -527,6 +536,245 @@ public void onAddPartition_withIncorrectFormat() throws MetaException {
assertThat(tableInput.getStorageDescriptor().getColumns().get(0).getComment(), is("incorrect_comment"));
}

@Test
public void onCreateHiveTableThatAlreadyExists() throws MetaException {
CreateTableEvent event = mock(CreateTableEvent.class);
when(event.getStatus()).thenReturn(true);
when(event.getTable()).thenReturn(simpleHiveTable(simpleSchema(), simplePartitioning()));
when(glueClient.createTable(any())).thenThrow(new AlreadyExistsException(""));

glueSync.onCreateTable(event);

verify(glueClient).createTable(any());
verify(glueClient).updateTable(updateTableRequestCaptor.capture());
verify(metricService).incrementCounter(MetricConstants.LISTENER_TABLE_SUCCESS);
assertThat(updateTableRequestCaptor.getValue().getDatabaseName(), is(gluePrefix + dbName));
assertThat(updateTableRequestCaptor.getValue().getTableInput().getName(), is(tableName));
}

@Test
public void onAlterHiveTableThatDoesntExistInGlue() throws MetaException {
AlterTableEvent event = mock(AlterTableEvent.class);
when(event.getStatus()).thenReturn(true);
Table table = simpleHiveTable(simpleSchema(), simplePartitioning());
when(event.getOldTable()).thenReturn(table);
when(event.getNewTable()).thenReturn(table);
when(glueClient.updateTable(any())).thenThrow(new EntityNotFoundException(""));

glueSync.onAlterTable(event);

verify(glueClient).updateTable(any());
verify(glueClient).createTable(createTableRequestCaptor.capture());
verify(metricService).incrementCounter(MetricConstants.LISTENER_TABLE_SUCCESS);
assertThat(createTableRequestCaptor.getValue().getDatabaseName(), is(gluePrefix + dbName));
assertThat(createTableRequestCaptor.getValue().getTableInput().getName(), is(tableName));
}

@Test
public void onAlterPartition() throws MetaException {
AlterPartitionEvent event = mock(AlterPartitionEvent.class);
when(event.getStatus()).thenReturn(true);
Table table = simpleHiveTable(simpleSchema(), simplePartitioning());
Partition partition = new Partition();
partition.setValues(Arrays.asList("part1Value", "part2Value"));
partition.setSd(table.getSd());
when(event.getTable()).thenReturn(table);
when(event.getNewPartition()).thenReturn(partition);

glueSync.onAlterPartition(event);

verify(glueClient).updatePartition(updatePartitionRequestCaptor.capture());
verify(metricService).incrementCounter(MetricConstants.LISTENER_PARTITION_SUCCESS);
assertThat(updatePartitionRequestCaptor.getValue().getDatabaseName(), is(gluePrefix + dbName));
assertThat(updatePartitionRequestCaptor.getValue().getTableName(), is(tableName));
}

@Test
public void onAlterPartitionThatDoesntExistInGlue() throws MetaException {
AlterPartitionEvent event = mock(AlterPartitionEvent.class);
when(event.getStatus()).thenReturn(true);
Table table = simpleHiveTable(simpleSchema(), simplePartitioning());
Partition partition = new Partition();
partition.setValues(Arrays.asList("part1Value", "part2Value"));
partition.setSd(table.getSd());
when(event.getTable()).thenReturn(table);
when(event.getNewPartition()).thenReturn(partition);
when(glueClient.updatePartition(any())).thenThrow(new EntityNotFoundException(""));

glueSync.onAlterPartition(event);

verify(glueClient).updatePartition(any());
verify(glueClient).createPartition(createPartitionRequestCaptor.capture());
verify(metricService).incrementCounter(MetricConstants.LISTENER_PARTITION_SUCCESS);
assertThat(createPartitionRequestCaptor.getValue().getDatabaseName(), is(gluePrefix + dbName));
assertThat(createPartitionRequestCaptor.getValue().getTableName(), is(tableName));
}

@Test
public void onAddPartition() throws MetaException {
AddPartitionEvent event = mock(AddPartitionEvent.class);
when(event.getStatus()).thenReturn(true);
Table table = simpleHiveTable(simpleSchema(), simplePartitioning());
Partition partition = new Partition();
partition.setValues(Arrays.asList("part1Value", "part2Value"));
partition.setSd(table.getSd());
when(event.getTable()).thenReturn(table);
when(event.getPartitionIterator()).thenReturn(Arrays.asList(partition).iterator());

glueSync.onAddPartition(event);

verify(glueClient).createPartition(createPartitionRequestCaptor.capture());
verify(metricService).incrementCounter(MetricConstants.LISTENER_PARTITION_SUCCESS);
assertThat(createPartitionRequestCaptor.getValue().getDatabaseName(), is(gluePrefix + dbName));
assertThat(createPartitionRequestCaptor.getValue().getTableName(), is(tableName));
}

@Test
public void onAddPartitionThatAlreadyExists() throws MetaException {
AddPartitionEvent event = mock(AddPartitionEvent.class);
when(event.getStatus()).thenReturn(true);
Table table = simpleHiveTable(simpleSchema(), simplePartitioning());
Partition partition = new Partition();
partition.setValues(Arrays.asList("part1Value", "part2Value"));
partition.setSd(table.getSd());
when(event.getTable()).thenReturn(table);
when(event.getPartitionIterator()).thenReturn(Arrays.asList(partition).iterator());
when(glueClient.createPartition(any())).thenThrow(new AlreadyExistsException(""));

glueSync.onAddPartition(event);

verify(glueClient).createPartition(any());
verify(glueClient).updatePartition(updatePartitionRequestCaptor.capture());
verify(metricService).incrementCounter(MetricConstants.LISTENER_PARTITION_SUCCESS);
assertThat(updatePartitionRequestCaptor.getValue().getDatabaseName(), is(gluePrefix + dbName));
assertThat(updatePartitionRequestCaptor.getValue().getTableName(), is(tableName));
}

@Test
public void onDropTable() throws MetaException {
DropTableEvent event = mock(DropTableEvent.class);
when(event.getStatus()).thenReturn(true);
when(event.getTable()).thenReturn(simpleHiveTable(simpleSchema(), simplePartitioning()));

glueSync.onDropTable(event);

verify(glueClient).deleteTable(deleteTableRequestCaptor.capture());
verify(metricService).incrementCounter(MetricConstants.LISTENER_TABLE_SUCCESS);
assertThat(deleteTableRequestCaptor.getValue().getDatabaseName(), is(gluePrefix + dbName));
assertThat(deleteTableRequestCaptor.getValue().getName(), is(tableName));
}

@Test
public void onDropTableThatDoesntExistInGlue() throws MetaException {
DropTableEvent event = mock(DropTableEvent.class);
when(event.getStatus()).thenReturn(true);
when(event.getTable()).thenReturn(simpleHiveTable(simpleSchema(), simplePartitioning()));
when(glueClient.deleteTable(any())).thenThrow(new EntityNotFoundException(""));

glueSync.onDropTable(event);

verify(glueClient).deleteTable(any());
verifyNoMoreInteractions(metricService);
}

@Test
public void onDropPartition() throws MetaException {
DropPartitionEvent event = mock(DropPartitionEvent.class);
when(event.getStatus()).thenReturn(true);
Table table = simpleHiveTable(simpleSchema(), simplePartitioning());
Partition partition = new Partition();
partition.setValues(Arrays.asList("part1Value", "part2Value"));
partition.setSd(table.getSd());
when(event.getTable()).thenReturn(table);
when(event.getPartitionIterator()).thenReturn(Arrays.asList(partition).iterator());

glueSync.onDropPartition(event);

verify(glueClient).deletePartition(any());
verify(metricService).incrementCounter(MetricConstants.LISTENER_PARTITION_SUCCESS);
}

@Test
public void allHandlers_ignoredWhenEventStatusFalse() throws MetaException {
CreateDatabaseEvent createDb = mock(CreateDatabaseEvent.class);
DropDatabaseEvent dropDb = mock(DropDatabaseEvent.class);
CreateTableEvent createTable = mock(CreateTableEvent.class);
DropTableEvent dropTable = mock(DropTableEvent.class);
AlterTableEvent alterTable = mock(AlterTableEvent.class);
AddPartitionEvent addPartition = mock(AddPartitionEvent.class);
DropPartitionEvent dropPartition = mock(DropPartitionEvent.class);
AlterPartitionEvent alterPartition = mock(AlterPartitionEvent.class);
when(createDb.getStatus()).thenReturn(false);
when(dropDb.getStatus()).thenReturn(false);
when(createTable.getStatus()).thenReturn(false);
when(dropTable.getStatus()).thenReturn(false);
when(alterTable.getStatus()).thenReturn(false);
when(addPartition.getStatus()).thenReturn(false);
when(dropPartition.getStatus()).thenReturn(false);
when(alterPartition.getStatus()).thenReturn(false);

glueSync.onCreateDatabase(createDb);
glueSync.onDropDatabase(dropDb);
glueSync.onCreateTable(createTable);
glueSync.onDropTable(dropTable);
glueSync.onAlterTable(alterTable);
glueSync.onAddPartition(addPartition);
glueSync.onDropPartition(dropPartition);
glueSync.onAlterPartition(alterPartition);

verifyZeroInteractions(glueClient);
verifyNoMoreInteractions(metricService);
}

@Test
public void onCreateTable_failureMetricsRecordedOnException() throws MetaException {
CreateTableEvent event = mock(CreateTableEvent.class);
when(event.getStatus()).thenReturn(true);
when(event.getTable()).thenReturn(simpleHiveTable(simpleSchema(), simplePartitioning()));
when(glueClient.createTable(any())).thenThrow(new OperationTimeoutException("timeout"));

glueSync.onCreateTable(event);

verify(metricService).incrementCounter(MetricConstants.LISTENER_TABLE_FAILURE);
verifyNoMoreInteractions(metricService);
}

@Test
public void onCreateTable_exceptionRethrownWhenThrowExceptionsEnabled() throws MetaException {
ApiaryGlueSync throwingSync = new ApiaryGlueSync(configuration, glueClient, gluePrefix, metricService, true);
CreateTableEvent event = mock(CreateTableEvent.class);
when(event.getStatus()).thenReturn(true);
when(event.getTable()).thenReturn(simpleHiveTable(simpleSchema(), simplePartitioning()));
when(glueClient.createTable(any())).thenThrow(new OperationTimeoutException("timeout"));

try {
throwingSync.onCreateTable(event);
fail("Expected MetaException");
} catch (MetaException e) {
// expected
}

verify(metricService).incrementCounter(MetricConstants.LISTENER_TABLE_FAILURE);
}

@Test
public void onAlterIcebergTable_RenameTableSkipsRenameOperation() throws MetaException {
AlterTableEvent event = mock(AlterTableEvent.class);
when(event.getStatus()).thenReturn(true);
Table oldTable = simpleIcebergTable(dbName, tableName, simpleIcebergSchema(), simpleIcebergPartitionSpec(), null);
Table newTable = simpleIcebergTable(dbName, "table_renamed", simpleIcebergSchema(), simpleIcebergPartitionSpec(), null);
when(event.getOldTable()).thenReturn(oldTable);
when(event.getNewTable()).thenReturn(newTable);

glueSync.onAlterTable(event);

verify(glueClient).updateTable(updateTableRequestCaptor.capture());
verify(metricService).incrementCounter(MetricConstants.LISTENER_TABLE_SUCCESS);
assertThat(updateTableRequestCaptor.getValue().getTableInput().getName(), is("table_renamed"));
verify(glueClient, times(0)).deleteTable(any());
verify(glueClient, times(0)).batchCreatePartition(any());
}

private Table simpleHiveTable(List<FieldSchema> schema, List<FieldSchema> partitions) {
Table table = new Table();
table.setTableName(tableName);
Expand Down
Loading