diff --git a/unified_user_actions/adapter/src/main/scala/com/twitter/unified_user_actions/adapter/behavioral_client_event/BUILD b/unified_user_actions/adapter/src/main/scala/com/twitter/unified_user_actions/adapter/behavioral_client_event/BUILD deleted file mode 100644 index 5c3030625..000000000 --- a/unified_user_actions/adapter/src/main/scala/com/twitter/unified_user_actions/adapter/behavioral_client_event/BUILD +++ /dev/null @@ -1,13 +0,0 @@ -scala_library( - sources = [ - "*.scala", - ], - tags = ["bazel-compatible"], - dependencies = [ - "client-events/thrift/src/thrift/storage/twitter/behavioral_event:behavioral_event-scala", - "kafka/finagle-kafka/finatra-kafka/src/main/scala", - "unified_user_actions/adapter/src/main/scala/com/twitter/unified_user_actions/adapter:base", - "unified_user_actions/adapter/src/main/scala/com/twitter/unified_user_actions/adapter/common", - "unified_user_actions/thrift/src/main/thrift/com/twitter/unified_user_actions:unified_user_actions-scala", - ], -) diff --git a/unified_user_actions/adapter/src/main/scala/com/twitter/unified_user_actions/adapter/behavioral_client_event/BaseBCEAdapter.scala b/unified_user_actions/adapter/src/main/scala/com/twitter/unified_user_actions/adapter/behavioral_client_event/BaseBCEAdapter.scala deleted file mode 100644 index ba81e9469..000000000 --- a/unified_user_actions/adapter/src/main/scala/com/twitter/unified_user_actions/adapter/behavioral_client_event/BaseBCEAdapter.scala +++ /dev/null @@ -1,96 +0,0 @@ -package com.twitter.unified_user_actions.adapter.behavioral_client_event - -import com.twitter.client_event_entities.serverside_context_key.latest.thriftscala.FlattenedServersideContextKey -import com.twitter.storage.behavioral_event.thriftscala.EventLogContext -import com.twitter.storage.behavioral_event.thriftscala.FlattenedEventLog -import com.twitter.unified_user_actions.adapter.common.AdapterUtils -import com.twitter.unified_user_actions.thriftscala.ActionType -import com.twitter.unified_user_actions.thriftscala.BreadcrumbTweet -import com.twitter.unified_user_actions.thriftscala.ClientEventNamespace -import com.twitter.unified_user_actions.thriftscala.EventMetadata -import com.twitter.unified_user_actions.thriftscala.Item -import com.twitter.unified_user_actions.thriftscala.ProductSurface -import com.twitter.unified_user_actions.thriftscala.ProductSurfaceInfo -import com.twitter.unified_user_actions.thriftscala.SourceLineage -import com.twitter.unified_user_actions.thriftscala.UnifiedUserAction -import com.twitter.unified_user_actions.thriftscala.UserIdentifier - -case class ProductSurfaceRelated( - productSurface: Option[ProductSurface], - productSurfaceInfo: Option[ProductSurfaceInfo]) - -trait BaseBCEAdapter { - def toUUA(e: FlattenedEventLog): Seq[UnifiedUserAction] - - protected def getUserIdentifier(c: EventLogContext): UserIdentifier = - UserIdentifier( - userId = c.userId, - guestIdMarketing = c.guestIdMarketing - ) - - protected def getEventMetadata(e: FlattenedEventLog): EventMetadata = - EventMetadata( - sourceLineage = SourceLineage.BehavioralClientEvents, - sourceTimestampMs = - e.context.driftAdjustedEventCreatedAtMs.getOrElse(e.context.eventCreatedAtMs), - receivedTimestampMs = AdapterUtils.currentTimestampMs, - // Client UI language or from Gizmoduck which is what user set in Twitter App. - // Please see more at https://sourcegraph.twitter.biz/git.twitter.biz/source/-/blob/finatra-internal/international/src/main/scala/com/twitter/finatra/international/LanguageIdentifier.scala - // The format should be ISO 639-1. - language = e.context.languageCode.map(AdapterUtils.normalizeLanguageCode), - // Country code could be IP address (geoduck) or User registration country (gizmoduck) and the former takes precedence. - // We don’t know exactly which one is applied, unfortunately, - // see https://sourcegraph.twitter.biz/git.twitter.biz/source/-/blob/finatra-internal/international/src/main/scala/com/twitter/finatra/international/CountryIdentifier.scala - // The format should be ISO_3166-1_alpha-2. - countryCode = e.context.countryCode.map(AdapterUtils.normalizeCountryCode), - clientAppId = e.context.clientApplicationId, - clientVersion = e.context.clientVersion, - clientPlatform = e.context.clientPlatform, - viewHierarchy = e.v1ViewTypeHierarchy, - clientEventNamespace = Some( - ClientEventNamespace( - page = e.page, - section = e.section, - element = e.element, - action = e.actionName, - subsection = e.subsection - )), - breadcrumbViews = e.v1BreadcrumbViewTypeHierarchy, - breadcrumbTweets = e.v1BreadcrumbTweetIds.map { breadcrumbs => - breadcrumbs.map { breadcrumb => - BreadcrumbTweet( - tweetId = breadcrumb.serversideContextId.toLong, - sourceComponent = breadcrumb.sourceComponent) - } - } - ) - - protected def getBreadcrumbTweetIds( - breadcrumbTweetIds: Option[Seq[FlattenedServersideContextKey]] - ): Seq[BreadcrumbTweet] = - breadcrumbTweetIds - .getOrElse(Nil).map(breadcrumb => { - BreadcrumbTweet( - tweetId = breadcrumb.serversideContextId.toLong, - sourceComponent = breadcrumb.sourceComponent) - }) - - protected def getBreadcrumbViews(breadcrumbView: Option[Seq[String]]): Seq[String] = - breadcrumbView.getOrElse(Nil) - - protected def getUnifiedUserAction( - event: FlattenedEventLog, - actionType: ActionType, - item: Item, - productSurface: Option[ProductSurface] = None, - productSurfaceInfo: Option[ProductSurfaceInfo] = None - ): UnifiedUserAction = - UnifiedUserAction( - userIdentifier = getUserIdentifier(event.context), - actionType = actionType, - item = item, - eventMetadata = getEventMetadata(event), - productSurface = productSurface, - productSurfaceInfo = productSurfaceInfo - ) -} diff --git a/unified_user_actions/adapter/src/main/scala/com/twitter/unified_user_actions/adapter/behavioral_client_event/BehavioralClientEventAdapter.scala b/unified_user_actions/adapter/src/main/scala/com/twitter/unified_user_actions/adapter/behavioral_client_event/BehavioralClientEventAdapter.scala deleted file mode 100644 index f2dbb5917..000000000 --- a/unified_user_actions/adapter/src/main/scala/com/twitter/unified_user_actions/adapter/behavioral_client_event/BehavioralClientEventAdapter.scala +++ /dev/null @@ -1,39 +0,0 @@ -package com.twitter.unified_user_actions.adapter.behavioral_client_event - -import com.twitter.finagle.stats.NullStatsReceiver -import com.twitter.finagle.stats.StatsReceiver -import com.twitter.finatra.kafka.serde.UnKeyed -import com.twitter.storage.behavioral_event.thriftscala.FlattenedEventLog -import com.twitter.unified_user_actions.adapter.AbstractAdapter -import com.twitter.unified_user_actions.thriftscala._ - -class BehavioralClientEventAdapter - extends AbstractAdapter[FlattenedEventLog, UnKeyed, UnifiedUserAction] { - - import BehavioralClientEventAdapter._ - - override def adaptOneToKeyedMany( - input: FlattenedEventLog, - statsReceiver: StatsReceiver = NullStatsReceiver - ): Seq[(UnKeyed, UnifiedUserAction)] = - adaptEvent(input).map { e => (UnKeyed, e) } -} - -object BehavioralClientEventAdapter { - def adaptEvent(e: FlattenedEventLog): Seq[UnifiedUserAction] = - // See go/bcecoverage for event namespaces, usage and coverage details - Option(e) - .map { e => - (e.page, e.actionName) match { - case (Some("tweet_details"), Some("impress")) => - TweetImpressionBCEAdapter.TweetDetails.toUUA(e) - case (Some("fullscreen_video"), Some("impress")) => - TweetImpressionBCEAdapter.FullscreenVideo.toUUA(e) - case (Some("fullscreen_image"), Some("impress")) => - TweetImpressionBCEAdapter.FullscreenImage.toUUA(e) - case (Some("profile"), Some("impress")) => - ProfileImpressionBCEAdapter.Profile.toUUA(e) - case _ => Nil - } - }.getOrElse(Nil) -} diff --git a/unified_user_actions/adapter/src/main/scala/com/twitter/unified_user_actions/adapter/behavioral_client_event/ImpressionBCEAdapter.scala b/unified_user_actions/adapter/src/main/scala/com/twitter/unified_user_actions/adapter/behavioral_client_event/ImpressionBCEAdapter.scala deleted file mode 100644 index 4c608c8c6..000000000 --- a/unified_user_actions/adapter/src/main/scala/com/twitter/unified_user_actions/adapter/behavioral_client_event/ImpressionBCEAdapter.scala +++ /dev/null @@ -1,34 +0,0 @@ -package com.twitter.unified_user_actions.adapter.behavioral_client_event - -import com.twitter.client.behavioral_event.action.impress.latest.thriftscala.Impress -import com.twitter.client_event_entities.serverside_context_key.latest.thriftscala.FlattenedServersideContextKey -import com.twitter.unified_user_actions.thriftscala.Item - -trait ImpressionBCEAdapter extends BaseBCEAdapter { - type ImpressedItem <: Item - - def getImpressedItem( - context: FlattenedServersideContextKey, - impression: Impress - ): ImpressedItem - - /** - * The start time of an impression in milliseconds since epoch. In BCE, the impression - * tracking clock will start immediately after the page is visible with no initial delay. - */ - def getImpressedStartTimestamp(impression: Impress): Long = - impression.visibilityPctDwellStartMs - - /** - * The end time of an impression in milliseconds since epoch. In BCE, the impression - * tracking clock will end before the user exit the page. - */ - def getImpressedEndTimestamp(impression: Impress): Long = - impression.visibilityPctDwellEndMs - - /** - * The UI component that hosted the impressed item. - */ - def getImpressedUISourceComponent(context: FlattenedServersideContextKey): String = - context.sourceComponent -} diff --git a/unified_user_actions/adapter/src/main/scala/com/twitter/unified_user_actions/adapter/behavioral_client_event/ProfileImpressionBCEAdapter.scala b/unified_user_actions/adapter/src/main/scala/com/twitter/unified_user_actions/adapter/behavioral_client_event/ProfileImpressionBCEAdapter.scala deleted file mode 100644 index ef072f1b1..000000000 --- a/unified_user_actions/adapter/src/main/scala/com/twitter/unified_user_actions/adapter/behavioral_client_event/ProfileImpressionBCEAdapter.scala +++ /dev/null @@ -1,52 +0,0 @@ -package com.twitter.unified_user_actions.adapter.behavioral_client_event - -import com.twitter.client.behavioral_event.action.impress.latest.thriftscala.Impress -import com.twitter.client_event_entities.serverside_context_key.latest.thriftscala.FlattenedServersideContextKey -import com.twitter.storage.behavioral_event.thriftscala.FlattenedEventLog -import com.twitter.unified_user_actions.thriftscala.ActionType -import com.twitter.unified_user_actions.thriftscala.ClientProfileV2Impression -import com.twitter.unified_user_actions.thriftscala.Item -import com.twitter.unified_user_actions.thriftscala.ProductSurface -import com.twitter.unified_user_actions.thriftscala.ProfileActionInfo -import com.twitter.unified_user_actions.thriftscala.ProfileInfo -import com.twitter.unified_user_actions.thriftscala.UnifiedUserAction - -object ProfileImpressionBCEAdapter { - val Profile = new ProfileImpressionBCEAdapter() -} - -class ProfileImpressionBCEAdapter extends ImpressionBCEAdapter { - override type ImpressedItem = Item.ProfileInfo - - override def toUUA(e: FlattenedEventLog): Seq[UnifiedUserAction] = - (e.v2Impress, e.v1UserIds) match { - case (Some(v2Impress), Some(v1UserIds)) => - v1UserIds.map { user => - getUnifiedUserAction( - event = e, - actionType = ActionType.ClientProfileV2Impression, - item = getImpressedItem(user, v2Impress), - productSurface = Some(ProductSurface.ProfilePage) - ) - } - case _ => Nil - } - - override def getImpressedItem( - context: FlattenedServersideContextKey, - impression: Impress - ): ImpressedItem = - Item.ProfileInfo( - ProfileInfo( - actionProfileId = context.serversideContextId.toLong, - profileActionInfo = Some( - ProfileActionInfo.ClientProfileV2Impression( - ClientProfileV2Impression( - impressStartTimestampMs = getImpressedStartTimestamp(impression), - impressEndTimestampMs = getImpressedEndTimestamp(impression), - sourceComponent = getImpressedUISourceComponent(context) - ) - ) - ) - )) -} diff --git a/unified_user_actions/adapter/src/main/scala/com/twitter/unified_user_actions/adapter/behavioral_client_event/TweetImpressionBCEAdapter.scala b/unified_user_actions/adapter/src/main/scala/com/twitter/unified_user_actions/adapter/behavioral_client_event/TweetImpressionBCEAdapter.scala deleted file mode 100644 index f7d51900b..000000000 --- a/unified_user_actions/adapter/src/main/scala/com/twitter/unified_user_actions/adapter/behavioral_client_event/TweetImpressionBCEAdapter.scala +++ /dev/null @@ -1,84 +0,0 @@ -package com.twitter.unified_user_actions.adapter.behavioral_client_event - -import com.twitter.client.behavioral_event.action.impress.latest.thriftscala.Impress -import com.twitter.client_event_entities.serverside_context_key.latest.thriftscala.FlattenedServersideContextKey -import com.twitter.storage.behavioral_event.thriftscala.FlattenedEventLog -import com.twitter.unified_user_actions.thriftscala.ActionType -import com.twitter.unified_user_actions.thriftscala.ClientTweetV2Impression -import com.twitter.unified_user_actions.thriftscala.Item -import com.twitter.unified_user_actions.thriftscala.ProductSurface -import com.twitter.unified_user_actions.thriftscala.TweetActionInfo -import com.twitter.unified_user_actions.thriftscala.TweetInfo -import com.twitter.unified_user_actions.thriftscala.UnifiedUserAction - -object TweetImpressionBCEAdapter { - val TweetDetails = new TweetImpressionBCEAdapter(ActionType.ClientTweetV2Impression) - val FullscreenVideo = new TweetImpressionBCEAdapter( - ActionType.ClientTweetVideoFullscreenV2Impression) - val FullscreenImage = new TweetImpressionBCEAdapter( - ActionType.ClientTweetImageFullscreenV2Impression) -} - -class TweetImpressionBCEAdapter(actionType: ActionType) extends ImpressionBCEAdapter { - override type ImpressedItem = Item.TweetInfo - - override def toUUA(e: FlattenedEventLog): Seq[UnifiedUserAction] = - (actionType, e.v2Impress, e.v1TweetIds, e.v1BreadcrumbTweetIds) match { - case (ActionType.ClientTweetV2Impression, Some(v2Impress), Some(v1TweetIds), _) => - toUUAEvents(e, v2Impress, v1TweetIds) - case ( - ActionType.ClientTweetVideoFullscreenV2Impression, - Some(v2Impress), - _, - Some(v1BreadcrumbTweetIds)) => - toUUAEvents(e, v2Impress, v1BreadcrumbTweetIds) - case ( - ActionType.ClientTweetImageFullscreenV2Impression, - Some(v2Impress), - _, - Some(v1BreadcrumbTweetIds)) => - toUUAEvents(e, v2Impress, v1BreadcrumbTweetIds) - case _ => Nil - } - - private def toUUAEvents( - e: FlattenedEventLog, - v2Impress: Impress, - v1TweetIds: Seq[FlattenedServersideContextKey] - ): Seq[UnifiedUserAction] = - v1TweetIds.map { tweet => - getUnifiedUserAction( - event = e, - actionType = actionType, - item = getImpressedItem(tweet, v2Impress), - productSurface = getProductSurfaceRelated.productSurface, - productSurfaceInfo = getProductSurfaceRelated.productSurfaceInfo - ) - } - - override def getImpressedItem( - context: FlattenedServersideContextKey, - impression: Impress - ): ImpressedItem = - Item.TweetInfo( - TweetInfo( - actionTweetId = context.serversideContextId.toLong, - tweetActionInfo = Some( - TweetActionInfo.ClientTweetV2Impression( - ClientTweetV2Impression( - impressStartTimestampMs = getImpressedStartTimestamp(impression), - impressEndTimestampMs = getImpressedEndTimestamp(impression), - sourceComponent = getImpressedUISourceComponent(context) - ) - )) - )) - - private def getProductSurfaceRelated: ProductSurfaceRelated = - actionType match { - case ActionType.ClientTweetV2Impression => - ProductSurfaceRelated( - productSurface = Some(ProductSurface.TweetDetailsPage), - productSurfaceInfo = None) - case _ => ProductSurfaceRelated(productSurface = None, productSurfaceInfo = None) - } -} diff --git a/unified_user_actions/adapter/src/test/scala/com/twitter/unified_user_actions/adapter/BehavioralClientEventAdapterSpec.scala b/unified_user_actions/adapter/src/test/scala/com/twitter/unified_user_actions/adapter/BehavioralClientEventAdapterSpec.scala deleted file mode 100644 index 3d834c89b..000000000 --- a/unified_user_actions/adapter/src/test/scala/com/twitter/unified_user_actions/adapter/BehavioralClientEventAdapterSpec.scala +++ /dev/null @@ -1,139 +0,0 @@ -package com.twitter.unified_user_actions.adapter - -import com.twitter.inject.Test -import com.twitter.storage.behavioral_event.thriftscala.FlattenedEventLog -import com.twitter.unified_user_actions.adapter.TestFixtures.BCEFixture -import com.twitter.unified_user_actions.adapter.behavioral_client_event.BehavioralClientEventAdapter -import com.twitter.unified_user_actions.thriftscala._ -import com.twitter.util.Time -import org.scalatest.prop.TableDrivenPropertyChecks - -class BehavioralClientEventAdapterSpec extends Test with TableDrivenPropertyChecks { - - test("basic event conversion should be correct") { - new BCEFixture { - Time.withTimeAt(frozenTime) { _ => - val tests = Table( - ("event", "expected", "description"), - ( - makeBCEEvent(), - makeUUAImpressEvent(productSurface = Some(ProductSurface.TweetDetailsPage)), - "tweet_details conversion"), - (makeBCEProfileImpressEvent(), makeUUAProfileImpressEvent(), "profile conversion"), - ( - makeBCEVideoFullscreenImpressEvent(), - makeUUAVideoFullscreenImpressEvent(), - "fullscreen_video conversion"), - ( - makeBCEImageFullscreenImpressEvent(), - makeUUAImageFullscreenImpressEvent(), - "fullscreen_image conversion"), - ) - forEvery(tests) { (input: FlattenedEventLog, expected: UnifiedUserAction, desc: String) => - assert(Seq(expected) === BehavioralClientEventAdapter.adaptEvent(input), desc) - } - } - } - } - - test( - "tweet_details is NOT missing productSurface[Info] when empty breadcrumb components and breadcrumbs tweets id") { - new BCEFixture { - Time.withTimeAt(frozenTime) { _ => - val input = makeBCEEvent(v1BreadcrumbViewTypeHierarchy = None, v1BreadcrumbTweetIds = None) - val expected = - makeUUAImpressEvent( - productSurface = Some(ProductSurface.TweetDetailsPage), - breadcrumbViews = None, - breadcrumbTweets = None) - val actual = BehavioralClientEventAdapter.adaptEvent(input) - - assert(Seq(expected) === actual) - } - } - } - - test("tweet_details is not missing productSurface[Info] when only breadcrumb tweets is empty") { - new BCEFixture { - Time.withTimeAt(frozenTime) { _ => - val input = makeBCEEvent(v1BreadcrumbTweetIds = None) - val expected = makeUUAImpressEvent( - productSurface = Some(ProductSurface.TweetDetailsPage), - breadcrumbViews = Some(viewBreadcrumbs), - breadcrumbTweets = None - ) - val actual = BehavioralClientEventAdapter.adaptEvent(input) - - assert(Seq(expected) === actual) - } - } - } - - test("unsupported events should be skipped") { - new BCEFixture { - val unsupportedPage = "unsupported_page" - val unsupportedAction = "unsupported_action" - val supportedNamespaces = Table( - ("page", "actions"), - ("tweet_details", Seq("impress")), - ("profile", Seq("impress")), - ) - - forAll(supportedNamespaces) { (page: String, actions: Seq[String]) => - actions.foreach { supportedAction => - assert( - BehavioralClientEventAdapter - .adaptEvent( - makeBCEEvent( - currentPage = Some(unsupportedPage), - actionName = Some(supportedAction))).isEmpty) - - assert(BehavioralClientEventAdapter - .adaptEvent( - makeBCEEvent(currentPage = Some(page), actionName = Some(unsupportedAction))).isEmpty) - } - } - } - } - - test("event w/ missing info should be skipped") { - new BCEFixture { - val eventsWithMissingInfo = Table( - ("event", "description"), - (null.asInstanceOf[FlattenedEventLog], "null event"), - (makeBCEEvent(v2Impress = None), "impression event missing v2Impress"), - (makeBCEEvent(v1TweetIds = None), "tweet event missing v1TweetIds"), - (makeBCEProfileImpressEvent(v1UserIds = None), "profile event missing v1UserIds"), - ( - makeBCEVideoFullscreenImpressEvent(v1BreadcrumbTweetIds = None), - "fullscreen_video event missing v1BreadcrumbTweetIds"), - ( - makeBCEImageFullscreenImpressEvent(v1BreadcrumbTweetIds = None), - "fullscreen_image event missing v1BreadcrumbTweetIds"), - ) - - forEvery(eventsWithMissingInfo) { (event: FlattenedEventLog, desc: String) => - assert( - BehavioralClientEventAdapter - .adaptEvent(event).isEmpty, - desc) - } - } - } - - test("use eventCreateAtMs when driftAdjustedTimetampMs is empty") { - new BCEFixture { - Time.withTimeAt(frozenTime) { _ => - val input = makeBCEEvent( - context = makeBCEContext(driftAdjustedEventCreatedAtMs = None) - ) - val expected = makeUUAImpressEvent( - createTs = eventCreatedTime, - productSurface = Some(ProductSurface.TweetDetailsPage)) - val actual = BehavioralClientEventAdapter.adaptEvent(input) - - assert(Seq(expected) === actual) - } - } - } -} diff --git a/unified_user_actions/service/src/main/scala/com/twitter/unified_user_actions/service/BehavioralClientEventService.scala b/unified_user_actions/service/src/main/scala/com/twitter/unified_user_actions/service/BehavioralClientEventService.scala deleted file mode 100644 index 43ca35ad1..000000000 --- a/unified_user_actions/service/src/main/scala/com/twitter/unified_user_actions/service/BehavioralClientEventService.scala +++ /dev/null @@ -1,25 +0,0 @@ -package com.twitter.unified_user_actions.service; - -import com.twitter.finatra.decider.modules.DeciderModule -import com.twitter.finatra.kafka.serde.UnKeyed -import com.twitter.inject.server.TwitterServer -import com.twitter.kafka.client.processor.AtLeastOnceProcessor -import com.twitter.storage.behavioral_event.thriftscala.FlattenedEventLog -import com.twitter.unified_user_actions.service.module.KafkaProcessorBehavioralClientEventModule - -object BehavioralClientEventServiceMain extends BehavioralClientEventService - -class BehavioralClientEventService extends TwitterServer { - override val modules = Seq( - KafkaProcessorBehavioralClientEventModule, - DeciderModule - ) - - override protected def setup(): Unit = {} - - override protected def start(): Unit = { - val processor = injector.instance[AtLeastOnceProcessor[UnKeyed, FlattenedEventLog]] - closeOnExit(processor) - processor.start() - } -} diff --git a/unified_user_actions/service/src/main/scala/com/twitter/unified_user_actions/service/module/KafkaProcessorBehavioralClientEventModule.scala b/unified_user_actions/service/src/main/scala/com/twitter/unified_user_actions/service/module/KafkaProcessorBehavioralClientEventModule.scala deleted file mode 100644 index 463c691e6..000000000 --- a/unified_user_actions/service/src/main/scala/com/twitter/unified_user_actions/service/module/KafkaProcessorBehavioralClientEventModule.scala +++ /dev/null @@ -1,87 +0,0 @@ -package com.twitter.unified_user_actions.service.module - -import com.google.inject.Provides -import com.twitter.decider.Decider -import com.twitter.finagle.stats.StatsReceiver -import com.twitter.finatra.kafka.serde.UnKeyed -import com.twitter.finatra.kafka.serde.UnKeyedSerde -import com.twitter.inject.annotations.Flag -import com.twitter.inject.TwitterModule -import com.twitter.kafka.client.processor.AtLeastOnceProcessor -import com.twitter.storage.behavioral_event.thriftscala.FlattenedEventLog -import com.twitter.unified_user_actions.adapter.behavioral_client_event.BehavioralClientEventAdapter -import com.twitter.unified_user_actions.kafka.CompressionTypeFlag -import com.twitter.unified_user_actions.kafka.serde.NullableScalaSerdes -import com.twitter.util.Duration -import com.twitter.util.StorageUnit -import com.twitter.util.logging.Logging -import javax.inject.Singleton - -object KafkaProcessorBehavioralClientEventModule extends TwitterModule with Logging { - override def modules = Seq(FlagsModule) - - private val adapter: BehavioralClientEventAdapter = new BehavioralClientEventAdapter - private final val processorName: String = "uuaProcessor" - - @Provides - @Singleton - def providesKafkaProcessor( - decider: Decider, - @Flag(FlagsModule.cluster) cluster: String, - @Flag(FlagsModule.kafkaSourceCluster) kafkaSourceCluster: String, - @Flag(FlagsModule.kafkaDestCluster) kafkaDestCluster: String, - @Flag(FlagsModule.kafkaSourceTopic) kafkaSourceTopic: String, - @Flag(FlagsModule.kafkaSinkTopics) kafkaSinkTopics: Seq[String], - @Flag(FlagsModule.kafkaGroupId) kafkaGroupId: String, - @Flag(FlagsModule.kafkaProducerClientId) kafkaProducerClientId: String, - @Flag(FlagsModule.kafkaMaxPendingRequests) kafkaMaxPendingRequests: Int, - @Flag(FlagsModule.kafkaWorkerThreads) kafkaWorkerThreads: Int, - @Flag(FlagsModule.commitInterval) commitInterval: Duration, - @Flag(FlagsModule.maxPollRecords) maxPollRecords: Int, - @Flag(FlagsModule.maxPollInterval) maxPollInterval: Duration, - @Flag(FlagsModule.sessionTimeout) sessionTimeout: Duration, - @Flag(FlagsModule.fetchMax) fetchMax: StorageUnit, - @Flag(FlagsModule.batchSize) batchSize: StorageUnit, - @Flag(FlagsModule.linger) linger: Duration, - @Flag(FlagsModule.bufferMem) bufferMem: StorageUnit, - @Flag(FlagsModule.compressionType) compressionTypeFlag: CompressionTypeFlag, - @Flag(FlagsModule.retries) retries: Int, - @Flag(FlagsModule.retryBackoff) retryBackoff: Duration, - @Flag(FlagsModule.requestTimeout) requestTimeout: Duration, - @Flag(FlagsModule.enableTrustStore) enableTrustStore: Boolean, - @Flag(FlagsModule.trustStoreLocation) trustStoreLocation: String, - statsReceiver: StatsReceiver, - ): AtLeastOnceProcessor[UnKeyed, FlattenedEventLog] = { - KafkaProcessorProvider.provideDefaultAtLeastOnceProcessor( - name = processorName, - kafkaSourceCluster = kafkaSourceCluster, - kafkaGroupId = kafkaGroupId, - kafkaSourceTopic = kafkaSourceTopic, - sourceKeyDeserializer = UnKeyedSerde.deserializer, - sourceValueDeserializer = NullableScalaSerdes - .Thrift[FlattenedEventLog](statsReceiver.counter("deserializerErrors")).deserializer, - commitInterval = commitInterval, - maxPollRecords = maxPollRecords, - maxPollInterval = maxPollInterval, - sessionTimeout = sessionTimeout, - fetchMax = fetchMax, - processorMaxPendingRequests = kafkaMaxPendingRequests, - processorWorkerThreads = kafkaWorkerThreads, - adapter = adapter, - kafkaSinkTopics = kafkaSinkTopics, - kafkaDestCluster = kafkaDestCluster, - kafkaProducerClientId = kafkaProducerClientId, - batchSize = batchSize, - linger = linger, - bufferMem = bufferMem, - compressionType = compressionTypeFlag.compressionType, - retries = retries, - retryBackoff = retryBackoff, - requestTimeout = requestTimeout, - statsReceiver = statsReceiver, - trustStoreLocationOpt = if (enableTrustStore) Some(trustStoreLocation) else None, - decider = decider, - zone = ZoneFiltering.zoneMapping(cluster), - ) - } -}