diff --git a/hive-event-listeners/apiary-gluesync-listener/src/test/java/com/expediagroup/apiary/extensions/gluesync/listener/ApiaryGlueSyncTest.java b/hive-event-listeners/apiary-gluesync-listener/src/test/java/com/expediagroup/apiary/extensions/gluesync/listener/ApiaryGlueSyncTest.java index 6d81757c..aca9cc66 100644 --- a/hive-event-listeners/apiary-gluesync-listener/src/test/java/com/expediagroup/apiary/extensions/gluesync/listener/ApiaryGlueSyncTest.java +++ b/hive-event-listeners/apiary-gluesync-listener/src/test/java/com/expediagroup/apiary/extensions/gluesync/listener/ApiaryGlueSyncTest.java @@ -19,12 +19,14 @@ import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; import static com.google.common.collect.Maps.newHashMap; @@ -52,10 +54,13 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.events.AddPartitionEvent; +import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent; import org.apache.hadoop.hive.metastore.events.AlterTableEvent; import org.apache.hadoop.hive.metastore.events.CreateDatabaseEvent; import org.apache.hadoop.hive.metastore.events.CreateTableEvent; import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent; +import org.apache.hadoop.hive.metastore.events.DropPartitionEvent; +import org.apache.hadoop.hive.metastore.events.DropTableEvent; import org.apache.hadoop.hive.serde.serdeConstants; import org.junit.Before; import org.junit.Test; @@ -80,9 +85,11 @@ import com.amazonaws.services.glue.model.GetDatabaseResult; import com.amazonaws.services.glue.model.GetPartitionsResult; import com.amazonaws.services.glue.model.InvalidInputException; +import com.amazonaws.services.glue.model.OperationTimeoutException; import com.amazonaws.services.glue.model.PartitionInput; import com.amazonaws.services.glue.model.TableInput; import com.amazonaws.services.glue.model.UpdateDatabaseRequest; +import com.amazonaws.services.glue.model.UpdatePartitionRequest; import com.amazonaws.services.glue.model.UpdateTableRequest; import com.google.common.collect.ImmutableMap; @@ -118,6 +125,8 @@ public class ApiaryGlueSyncTest { private ArgumentCaptor deleteDatabaseRequestCaptor; @Captor private ArgumentCaptor createPartitionRequestCaptor; + @Captor + private ArgumentCaptor updatePartitionRequestCaptor; private final String tableName = "some_table"; private final String dbName = "some_db"; @@ -527,6 +536,245 @@ public void onAddPartition_withIncorrectFormat() throws MetaException { assertThat(tableInput.getStorageDescriptor().getColumns().get(0).getComment(), is("incorrect_comment")); } + @Test + public void onCreateHiveTableThatAlreadyExists() throws MetaException { + CreateTableEvent event = mock(CreateTableEvent.class); + when(event.getStatus()).thenReturn(true); + when(event.getTable()).thenReturn(simpleHiveTable(simpleSchema(), simplePartitioning())); + when(glueClient.createTable(any())).thenThrow(new AlreadyExistsException("")); + + glueSync.onCreateTable(event); + + verify(glueClient).createTable(any()); + verify(glueClient).updateTable(updateTableRequestCaptor.capture()); + verify(metricService).incrementCounter(MetricConstants.LISTENER_TABLE_SUCCESS); + assertThat(updateTableRequestCaptor.getValue().getDatabaseName(), is(gluePrefix + dbName)); + assertThat(updateTableRequestCaptor.getValue().getTableInput().getName(), is(tableName)); + } + + @Test + public void onAlterHiveTableThatDoesntExistInGlue() throws MetaException { + AlterTableEvent event = mock(AlterTableEvent.class); + when(event.getStatus()).thenReturn(true); + Table table = simpleHiveTable(simpleSchema(), simplePartitioning()); + when(event.getOldTable()).thenReturn(table); + when(event.getNewTable()).thenReturn(table); + when(glueClient.updateTable(any())).thenThrow(new EntityNotFoundException("")); + + glueSync.onAlterTable(event); + + verify(glueClient).updateTable(any()); + verify(glueClient).createTable(createTableRequestCaptor.capture()); + verify(metricService).incrementCounter(MetricConstants.LISTENER_TABLE_SUCCESS); + assertThat(createTableRequestCaptor.getValue().getDatabaseName(), is(gluePrefix + dbName)); + assertThat(createTableRequestCaptor.getValue().getTableInput().getName(), is(tableName)); + } + + @Test + public void onAlterPartition() throws MetaException { + AlterPartitionEvent event = mock(AlterPartitionEvent.class); + when(event.getStatus()).thenReturn(true); + Table table = simpleHiveTable(simpleSchema(), simplePartitioning()); + Partition partition = new Partition(); + partition.setValues(Arrays.asList("part1Value", "part2Value")); + partition.setSd(table.getSd()); + when(event.getTable()).thenReturn(table); + when(event.getNewPartition()).thenReturn(partition); + + glueSync.onAlterPartition(event); + + verify(glueClient).updatePartition(updatePartitionRequestCaptor.capture()); + verify(metricService).incrementCounter(MetricConstants.LISTENER_PARTITION_SUCCESS); + assertThat(updatePartitionRequestCaptor.getValue().getDatabaseName(), is(gluePrefix + dbName)); + assertThat(updatePartitionRequestCaptor.getValue().getTableName(), is(tableName)); + } + + @Test + public void onAlterPartitionThatDoesntExistInGlue() throws MetaException { + AlterPartitionEvent event = mock(AlterPartitionEvent.class); + when(event.getStatus()).thenReturn(true); + Table table = simpleHiveTable(simpleSchema(), simplePartitioning()); + Partition partition = new Partition(); + partition.setValues(Arrays.asList("part1Value", "part2Value")); + partition.setSd(table.getSd()); + when(event.getTable()).thenReturn(table); + when(event.getNewPartition()).thenReturn(partition); + when(glueClient.updatePartition(any())).thenThrow(new EntityNotFoundException("")); + + glueSync.onAlterPartition(event); + + verify(glueClient).updatePartition(any()); + verify(glueClient).createPartition(createPartitionRequestCaptor.capture()); + verify(metricService).incrementCounter(MetricConstants.LISTENER_PARTITION_SUCCESS); + assertThat(createPartitionRequestCaptor.getValue().getDatabaseName(), is(gluePrefix + dbName)); + assertThat(createPartitionRequestCaptor.getValue().getTableName(), is(tableName)); + } + + @Test + public void onAddPartition() throws MetaException { + AddPartitionEvent event = mock(AddPartitionEvent.class); + when(event.getStatus()).thenReturn(true); + Table table = simpleHiveTable(simpleSchema(), simplePartitioning()); + Partition partition = new Partition(); + partition.setValues(Arrays.asList("part1Value", "part2Value")); + partition.setSd(table.getSd()); + when(event.getTable()).thenReturn(table); + when(event.getPartitionIterator()).thenReturn(Arrays.asList(partition).iterator()); + + glueSync.onAddPartition(event); + + verify(glueClient).createPartition(createPartitionRequestCaptor.capture()); + verify(metricService).incrementCounter(MetricConstants.LISTENER_PARTITION_SUCCESS); + assertThat(createPartitionRequestCaptor.getValue().getDatabaseName(), is(gluePrefix + dbName)); + assertThat(createPartitionRequestCaptor.getValue().getTableName(), is(tableName)); + } + + @Test + public void onAddPartitionThatAlreadyExists() throws MetaException { + AddPartitionEvent event = mock(AddPartitionEvent.class); + when(event.getStatus()).thenReturn(true); + Table table = simpleHiveTable(simpleSchema(), simplePartitioning()); + Partition partition = new Partition(); + partition.setValues(Arrays.asList("part1Value", "part2Value")); + partition.setSd(table.getSd()); + when(event.getTable()).thenReturn(table); + when(event.getPartitionIterator()).thenReturn(Arrays.asList(partition).iterator()); + when(glueClient.createPartition(any())).thenThrow(new AlreadyExistsException("")); + + glueSync.onAddPartition(event); + + verify(glueClient).createPartition(any()); + verify(glueClient).updatePartition(updatePartitionRequestCaptor.capture()); + verify(metricService).incrementCounter(MetricConstants.LISTENER_PARTITION_SUCCESS); + assertThat(updatePartitionRequestCaptor.getValue().getDatabaseName(), is(gluePrefix + dbName)); + assertThat(updatePartitionRequestCaptor.getValue().getTableName(), is(tableName)); + } + + @Test + public void onDropTable() throws MetaException { + DropTableEvent event = mock(DropTableEvent.class); + when(event.getStatus()).thenReturn(true); + when(event.getTable()).thenReturn(simpleHiveTable(simpleSchema(), simplePartitioning())); + + glueSync.onDropTable(event); + + verify(glueClient).deleteTable(deleteTableRequestCaptor.capture()); + verify(metricService).incrementCounter(MetricConstants.LISTENER_TABLE_SUCCESS); + assertThat(deleteTableRequestCaptor.getValue().getDatabaseName(), is(gluePrefix + dbName)); + assertThat(deleteTableRequestCaptor.getValue().getName(), is(tableName)); + } + + @Test + public void onDropTableThatDoesntExistInGlue() throws MetaException { + DropTableEvent event = mock(DropTableEvent.class); + when(event.getStatus()).thenReturn(true); + when(event.getTable()).thenReturn(simpleHiveTable(simpleSchema(), simplePartitioning())); + when(glueClient.deleteTable(any())).thenThrow(new EntityNotFoundException("")); + + glueSync.onDropTable(event); + + verify(glueClient).deleteTable(any()); + verifyNoMoreInteractions(metricService); + } + + @Test + public void onDropPartition() throws MetaException { + DropPartitionEvent event = mock(DropPartitionEvent.class); + when(event.getStatus()).thenReturn(true); + Table table = simpleHiveTable(simpleSchema(), simplePartitioning()); + Partition partition = new Partition(); + partition.setValues(Arrays.asList("part1Value", "part2Value")); + partition.setSd(table.getSd()); + when(event.getTable()).thenReturn(table); + when(event.getPartitionIterator()).thenReturn(Arrays.asList(partition).iterator()); + + glueSync.onDropPartition(event); + + verify(glueClient).deletePartition(any()); + verify(metricService).incrementCounter(MetricConstants.LISTENER_PARTITION_SUCCESS); + } + + @Test + public void allHandlers_ignoredWhenEventStatusFalse() throws MetaException { + CreateDatabaseEvent createDb = mock(CreateDatabaseEvent.class); + DropDatabaseEvent dropDb = mock(DropDatabaseEvent.class); + CreateTableEvent createTable = mock(CreateTableEvent.class); + DropTableEvent dropTable = mock(DropTableEvent.class); + AlterTableEvent alterTable = mock(AlterTableEvent.class); + AddPartitionEvent addPartition = mock(AddPartitionEvent.class); + DropPartitionEvent dropPartition = mock(DropPartitionEvent.class); + AlterPartitionEvent alterPartition = mock(AlterPartitionEvent.class); + when(createDb.getStatus()).thenReturn(false); + when(dropDb.getStatus()).thenReturn(false); + when(createTable.getStatus()).thenReturn(false); + when(dropTable.getStatus()).thenReturn(false); + when(alterTable.getStatus()).thenReturn(false); + when(addPartition.getStatus()).thenReturn(false); + when(dropPartition.getStatus()).thenReturn(false); + when(alterPartition.getStatus()).thenReturn(false); + + glueSync.onCreateDatabase(createDb); + glueSync.onDropDatabase(dropDb); + glueSync.onCreateTable(createTable); + glueSync.onDropTable(dropTable); + glueSync.onAlterTable(alterTable); + glueSync.onAddPartition(addPartition); + glueSync.onDropPartition(dropPartition); + glueSync.onAlterPartition(alterPartition); + + verifyZeroInteractions(glueClient); + verifyNoMoreInteractions(metricService); + } + + @Test + public void onCreateTable_failureMetricsRecordedOnException() throws MetaException { + CreateTableEvent event = mock(CreateTableEvent.class); + when(event.getStatus()).thenReturn(true); + when(event.getTable()).thenReturn(simpleHiveTable(simpleSchema(), simplePartitioning())); + when(glueClient.createTable(any())).thenThrow(new OperationTimeoutException("timeout")); + + glueSync.onCreateTable(event); + + verify(metricService).incrementCounter(MetricConstants.LISTENER_TABLE_FAILURE); + verifyNoMoreInteractions(metricService); + } + + @Test + public void onCreateTable_exceptionRethrownWhenThrowExceptionsEnabled() throws MetaException { + ApiaryGlueSync throwingSync = new ApiaryGlueSync(configuration, glueClient, gluePrefix, metricService, true); + CreateTableEvent event = mock(CreateTableEvent.class); + when(event.getStatus()).thenReturn(true); + when(event.getTable()).thenReturn(simpleHiveTable(simpleSchema(), simplePartitioning())); + when(glueClient.createTable(any())).thenThrow(new OperationTimeoutException("timeout")); + + try { + throwingSync.onCreateTable(event); + fail("Expected MetaException"); + } catch (MetaException e) { + // expected + } + + verify(metricService).incrementCounter(MetricConstants.LISTENER_TABLE_FAILURE); + } + + @Test + public void onAlterIcebergTable_RenameTableSkipsRenameOperation() throws MetaException { + AlterTableEvent event = mock(AlterTableEvent.class); + when(event.getStatus()).thenReturn(true); + Table oldTable = simpleIcebergTable(dbName, tableName, simpleIcebergSchema(), simpleIcebergPartitionSpec(), null); + Table newTable = simpleIcebergTable(dbName, "table_renamed", simpleIcebergSchema(), simpleIcebergPartitionSpec(), null); + when(event.getOldTable()).thenReturn(oldTable); + when(event.getNewTable()).thenReturn(newTable); + + glueSync.onAlterTable(event); + + verify(glueClient).updateTable(updateTableRequestCaptor.capture()); + verify(metricService).incrementCounter(MetricConstants.LISTENER_TABLE_SUCCESS); + assertThat(updateTableRequestCaptor.getValue().getTableInput().getName(), is("table_renamed")); + verify(glueClient, times(0)).deleteTable(any()); + verify(glueClient, times(0)).batchCreatePartition(any()); + } + private Table simpleHiveTable(List schema, List partitions) { Table table = new Table(); table.setTableName(tableName);