diff --git a/src/main/scala/fr/polytechnique/cmap/cnam/etl/events/ProductOrService.scala b/src/main/scala/fr/polytechnique/cmap/cnam/etl/events/ProductOrService.scala new file mode 100644 index 00000000..5238b4bc --- /dev/null +++ b/src/main/scala/fr/polytechnique/cmap/cnam/etl/events/ProductOrService.scala @@ -0,0 +1,11 @@ +package fr.polytechnique.cmap.cnam.etl.events +import java.sql.Timestamp + +object ProductOrService extends ProductOrService + +trait ProductOrService extends AnyEvent with EventBuilder{ + override val category: EventCategory[ProductOrService] = "product_or_service" + + def apply(patientID: String, groupID: String, name: String, quantity: Double, date: Timestamp): Event[ProductOrService] = + Event(patientID, category, groupID, name, quantity, date, None) +} diff --git a/src/main/scala/fr/polytechnique/cmap/cnam/etl/extractors/events/acts/DcirMedicalActExtractor.scala b/src/main/scala/fr/polytechnique/cmap/cnam/etl/extractors/events/acts/DcirMedicalActExtractor.scala index 7e2bd4f0..bf9691c9 100644 --- a/src/main/scala/fr/polytechnique/cmap/cnam/etl/extractors/events/acts/DcirMedicalActExtractor.scala +++ b/src/main/scala/fr/polytechnique/cmap/cnam/etl/extractors/events/acts/DcirMedicalActExtractor.scala @@ -3,13 +3,15 @@ package fr.polytechnique.cmap.cnam.etl.extractors.events.acts import java.sql.Timestamp + import scala.util.Try import org.apache.spark.sql.Row -import fr.polytechnique.cmap.cnam.etl.events.{BiologyDcirAct, DcirAct, EventBuilder, MedicalAct} +import fr.polytechnique.cmap.cnam.etl.events.{AnyEvent, BiologyDcirAct, DcirAct, EventBuilder, MedicalAct, ProductOrService} import fr.polytechnique.cmap.cnam.etl.extractors.StartsWithStrategy import fr.polytechnique.cmap.cnam.etl.extractors.codes.SimpleExtractorCodes import fr.polytechnique.cmap.cnam.etl.extractors.sources.dcir.DcirSimpleExtractor import fr.polytechnique.cmap.cnam.util.functions.makeTS +import fr.polytechnique.cmap.cnam.etl.events.AnyEvent /** * Gets all type of Acts from DCIR. @@ -19,8 +21,9 @@ import fr.polytechnique.cmap.cnam.util.functions.makeTS * the information is not available a default DCIRAct. * @param codes: List of Act codes to be tracked in the study or empty to get all the Acts. */ -abstract sealed class DcirRowActExtractor(codes: SimpleExtractorCodes) extends DcirSimpleExtractor[MedicalAct] - with StartsWithStrategy[MedicalAct] { +//TODO This abstract class should be an indepedent class +abstract class DcirRowActExtractor[EventType <: AnyEvent](codes: SimpleExtractorCodes) extends DcirSimpleExtractor[EventType] + with StartsWithStrategy[EventType] { final val PrivateInstitutionCodes = Set(4D, 5D, 6D, 7D) @@ -44,7 +47,7 @@ abstract sealed class DcirRowActExtractor(codes: SimpleExtractorCodes) extends D * @param r the row of DCIR to be investigated. * @return Try[String] */ - // TODO: REMOVE THIS + // TODO: Unification of extractGroupId in DCIR override def extractGroupId(r: Row): String = { Try { @@ -82,7 +85,7 @@ abstract sealed class DcirRowActExtractor(codes: SimpleExtractorCodes) extends D * @param codes: List of Act codes to be tracked in the study or empty to get all the Acts. */ final case class DcirMedicalActExtractor(codes: SimpleExtractorCodes) - extends DcirRowActExtractor(codes) { + extends DcirRowActExtractor[MedicalAct](codes) { // Implementation of the BasicExtractor Trait override val columnName: String = ColNames.CamCode override val eventBuilder: EventBuilder = DcirAct @@ -93,7 +96,7 @@ final case class DcirMedicalActExtractor(codes: SimpleExtractorCodes) * @param codes: List of Act codes to be tracked in the study or empty to get all the Acts. */ final case class DcirBiologyActExtractor(codes: SimpleExtractorCodes) - extends DcirRowActExtractor(codes) { + extends DcirRowActExtractor[MedicalAct](codes) { // Implementation of the BasicExtractor Trait override val columnName: String = ColNames.BioCode override val eventBuilder: EventBuilder = BiologyDcirAct @@ -101,4 +104,4 @@ final case class DcirBiologyActExtractor(codes: SimpleExtractorCodes) // Because BioCode is a Double override def extractValue(row: Row): String = row.getAs[Double](columnName).toString -} +} \ No newline at end of file diff --git a/src/main/scala/fr/polytechnique/cmap/cnam/etl/extractors/events/productsorservices/DcirProductOrServiceExtractor.scala b/src/main/scala/fr/polytechnique/cmap/cnam/etl/extractors/events/productsorservices/DcirProductOrServiceExtractor.scala new file mode 100644 index 00000000..14d43719 --- /dev/null +++ b/src/main/scala/fr/polytechnique/cmap/cnam/etl/extractors/events/productsorservices/DcirProductOrServiceExtractor.scala @@ -0,0 +1,25 @@ +// License: BSD 3 clause + +package fr.polytechnique.cmap.cnam.etl.extractors.events.productsorservices + +import fr.polytechnique.cmap.cnam.etl.events.{DcirAct, EventBuilder, ProductOrService} +import fr.polytechnique.cmap.cnam.etl.extractors.codes.SimpleExtractorCodes +import fr.polytechnique.cmap.cnam.etl.extractors.events.acts.DcirRowActExtractor +import org.apache.spark.sql.Row + +import scala.util.Try + +final case class DcirProductOrServiceExtractor(codes: SimpleExtractorCodes) + extends DcirRowActExtractor[ProductOrService](codes) { + override val columnName: String = ColNames.TipCode + override val eventBuilder: EventBuilder = ProductOrService + + // Implementation of the EventRowExtractor + override def usedColumns: List[String] = List( + ColNames.TipCode, ColNames.TipQuantity, + ColNames.InstitutionCode, ColNames.GHSCode, ColNames.Sector + ) ++ super.usedColumns + + // Number of products/services + override def extractWeight(r: Row): Double = r.getAs[Int](ColNames.TipQuantity).toDouble +} diff --git a/src/main/scala/fr/polytechnique/cmap/cnam/etl/extractors/sources/dcir/DcirSource.scala b/src/main/scala/fr/polytechnique/cmap/cnam/etl/extractors/sources/dcir/DcirSource.scala index 52d59b66..0f1f0913 100644 --- a/src/main/scala/fr/polytechnique/cmap/cnam/etl/extractors/sources/dcir/DcirSource.scala +++ b/src/main/scala/fr/polytechnique/cmap/cnam/etl/extractors/sources/dcir/DcirSource.scala @@ -15,6 +15,8 @@ trait DcirSource extends ColumnNames { lazy val CamCode: String = "ER_CAM_F__CAM_PRS_IDE" lazy val BioCode: String = "ER_BIO_F__BIO_PRS_IDE" lazy val GHSCode: String = "ER_ETE_F__ETE_GHS_NUM" + lazy val TipCode: ColName = "ER_TIP_F__TIP_PRS_IDE" + lazy val TipQuantity: ColName = "ER_TIP_F__TIP_ACT_QSN" lazy val InstitutionCode: String = "ER_ETE_F__ETE_TYP_COD" lazy val Sector: String = "ER_ETE_F__PRS_PPU_SEC" lazy val NaturePrestation: ColName = "PRS_NAT_REF" @@ -26,7 +28,6 @@ trait DcirSource extends ColumnNames { lazy val FlowEmitterNumber: ColName = "FLX_EMT_ORD" lazy val OrgId: ColName = "ORG_CLE_NUM" lazy val OrderId: ColName = "DCT_ORD_NUM" - } } diff --git a/src/test/resources/DCIR/ER_TIP_F.csv b/src/test/resources/DCIR/ER_TIP_F.csv new file mode 100644 index 00000000..37dd2d9c --- /dev/null +++ b/src/test/resources/DCIR/ER_TIP_F.csv @@ -0,0 +1,3 @@ +LPP_ECT_MNT,LPP_ECU_MNT,TIP_ACL_DTD,TIP_ACL_DTF,TIP_ACT_PRU,TIP_ACT_QSN,TIP_ORD_NUM,TIP_PRS_IDE,TIP_PRS_TYP,TIP_PUB_PRX,TIP_SIR_NUM,ORG_CLE_NEW,DCT_ORD_NUM,FLX_DIS_DTD,FLX_EMT_NUM,FLX_EMT_ORD,FLX_EMT_TYP,FLX_TRT_DTD,ORG_CLE_NUM,PRS_ORD_NUM,REM_TYP_AFF +0,0,2006-02-01,2006-02-01,100,1,1,5589,6,100,12345,CODE1234,1000,2007-02-01,5,412,3,2007-01-10,CODE1234,5,2 +0,0,2006-02-01,2006-02-01,100,1,1,5600,6,100,12345,CODE1234,10,2006-02-01,3,2,1,2006-01-20,CODE1234,2,1 diff --git a/src/test/resources/test-input/DCIR_all.parquet b/src/test/resources/test-input/DCIR_all.parquet new file mode 100644 index 00000000..532f6d25 Binary files /dev/null and b/src/test/resources/test-input/DCIR_all.parquet differ diff --git a/src/test/scala/fr/polytechnique/cmap/cnam/etl/events/ProductOrServiceSuite.scala b/src/test/scala/fr/polytechnique/cmap/cnam/etl/events/ProductOrServiceSuite.scala new file mode 100644 index 00000000..6d30c88f --- /dev/null +++ b/src/test/scala/fr/polytechnique/cmap/cnam/etl/events/ProductOrServiceSuite.scala @@ -0,0 +1,26 @@ +// License: BSD 3 clause + +package fr.polytechnique.cmap.cnam.etl.events + +import java.sql.Timestamp + +import org.mockito.Mockito.mock +import org.scalatest.flatspec.AnyFlatSpec + +class ProductOrServiceSuite extends AnyFlatSpec { + + val patientID: String = "patientID" + val timestamp: Timestamp = mock(classOf[Timestamp]) + + "apply" should "allow creation of a Medical Act event" in { + + // Given + val expected = Event[ProductOrService](patientID, ProductOrService.category, "hosp", "C67", 0.0, timestamp, None) + + // When + val result = ProductOrService(patientID, "hosp", "C67", 0.0, timestamp) + + // Then + assert(result == expected) + } +} diff --git a/src/test/scala/fr/polytechnique/cmap/cnam/etl/extractors/events/acts/DcirBiologyActsSuite.scala b/src/test/scala/fr/polytechnique/cmap/cnam/etl/extractors/events/acts/DcirBiologyActsSuite.scala index b913a522..83a7f687 100644 --- a/src/test/scala/fr/polytechnique/cmap/cnam/etl/extractors/events/acts/DcirBiologyActsSuite.scala +++ b/src/test/scala/fr/polytechnique/cmap/cnam/etl/extractors/events/acts/DcirBiologyActsSuite.scala @@ -181,7 +181,7 @@ class DcirBiologyActsSuite extends SharedContext { // Given val codes = SimpleExtractorCodes(List("238")) - val input = sqlCtx.read.parquet("src/test/resources/test-input/DCIR_w_BIO.parquet") + val input = sqlCtx.read.parquet("src/test/resources/test-input/DCIR_all.parquet") val sources = Sources(dcir = Some(input)) diff --git a/src/test/scala/fr/polytechnique/cmap/cnam/etl/extractors/events/prestations/PractitionerClaimSpecialityExtractorSuite.scala b/src/test/scala/fr/polytechnique/cmap/cnam/etl/extractors/events/prestations/PractitionerClaimSpecialityExtractorSuite.scala index fc12ddac..eff842fa 100644 --- a/src/test/scala/fr/polytechnique/cmap/cnam/etl/extractors/events/prestations/PractitionerClaimSpecialityExtractorSuite.scala +++ b/src/test/scala/fr/polytechnique/cmap/cnam/etl/extractors/events/prestations/PractitionerClaimSpecialityExtractorSuite.scala @@ -19,7 +19,7 @@ class PractitionerClaimSpecialityExtractorSuite extends SharedContext { // Given val medicalSpeCodes = SimpleExtractorCodes(List("42")) - val input = spark.read.parquet("src/test/resources/test-input/DCIR_w_BIO.parquet") + val input = spark.read.parquet("src/test/resources/test-input/DCIR_all.parquet") val sources = Sources(dcir = Some(input)) val expected = Seq[Event[PractitionerClaimSpeciality]]( @@ -44,7 +44,7 @@ class PractitionerClaimSpecialityExtractorSuite extends SharedContext { // Given val nonMedicalSpeCodes = SimpleExtractorCodes(List("42")) - val input = spark.read.parquet("src/test/resources/test-input/DCIR_w_BIO.parquet") + val input = spark.read.parquet("src/test/resources/test-input/DCIR_all.parquet") val sources = Sources(dcir = Some(input)) val expected = Seq[Event[PractitionerClaimSpeciality]]( @@ -87,7 +87,7 @@ class PractitionerClaimSpecialityExtractorSuite extends SharedContext { import sqlCtx.implicits._ // Given - val input = spark.read.parquet("src/test/resources/test-input/DCIR_w_BIO.parquet") + val input = spark.read.parquet("src/test/resources/test-input/DCIR_all.parquet") val sources = Sources(dcir = Some(input)) val expected = Seq[Event[PractitionerClaimSpeciality]]( diff --git a/src/test/scala/fr/polytechnique/cmap/cnam/etl/extractors/events/productsorservices/DcirProductOrServiceSuite.scala b/src/test/scala/fr/polytechnique/cmap/cnam/etl/extractors/events/productsorservices/DcirProductOrServiceSuite.scala new file mode 100644 index 00000000..d73f9177 --- /dev/null +++ b/src/test/scala/fr/polytechnique/cmap/cnam/etl/extractors/events/productsorservices/DcirProductOrServiceSuite.scala @@ -0,0 +1,35 @@ +// License: BSD 3 clause + +package fr.polytechnique.cmap.cnam.etl.extractors.events.productsorservices + +import fr.polytechnique.cmap.cnam.SharedContext +import fr.polytechnique.cmap.cnam.etl.events._ +import fr.polytechnique.cmap.cnam.etl.extractors.codes.SimpleExtractorCodes +import fr.polytechnique.cmap.cnam.etl.sources.Sources +import fr.polytechnique.cmap.cnam.util.functions._ +import org.apache.spark.sql.DataFrame + +class DcirProductOrServiceSuite extends SharedContext { + + "extract" should "extract lpp products from raw data of the dcir (er_tip_f table)" in { + + val sqlCtx = sqlContext + import sqlCtx.implicits._ + + // Given + val dcir: DataFrame = sqlCtx.read.load("src/test/resources/test-input/DCIR_all.parquet") + val input = Sources(dcir = Some(dcir)) + + val expected = Seq[Event[ProductOrService]]( + ProductOrService("Patient_01", "liberal", "5600", 1.0, makeTS(2006, 1, 15)), + ProductOrService("Patient_02", "liberal", "5589", 1.0, makeTS(2006, 1, 5)) + ).toDS + + // When + val result = DcirProductOrServiceExtractor(SimpleExtractorCodes.empty).extract(input) + + // Then + assertDSs(result, expected) + } +} +