twitter-the-algorithm/src/scala/com/twitter/simclusters_v2/scalding/embedding/EntityToSimClustersEmbeddin...

355 wiersze
13 KiB
Scala

package com.twitter.simclusters_v2.scalding.embedding
import com.twitter.dal.client.dataset.KeyValDALDataset
import com.twitter.recos.entities.thriftscala.Entity
import com.twitter.recos.entities.thriftscala.Hashtag
import com.twitter.recos.entities.thriftscala.SemanticCoreEntity
import com.twitter.scalding._
import com.twitter.scalding_internal.dalv2.DALWrite._
import com.twitter.scalding_internal.multiformat.format.keyval.KeyVal
import com.twitter.simclusters_v2.common.ModelVersions
import com.twitter.simclusters_v2.common.SimClustersEmbedding
import com.twitter.simclusters_v2.hdfs_sources._
import com.twitter.simclusters_v2.scalding.embedding.common.EmbeddingUtil
import com.twitter.simclusters_v2.scalding.embedding.common.EmbeddingUtil._
import com.twitter.simclusters_v2.scalding.embedding.common.EntityEmbeddingUtil
import com.twitter.simclusters_v2.scalding.embedding.common.SimClustersEmbeddingJob
import com.twitter.simclusters_v2.thriftscala.{
SimClustersEmbedding => ThriftSimClustersEmbedding,
_
}
import com.twitter.wtf.entity_real_graph.common.EntityUtil
import com.twitter.wtf.entity_real_graph.thriftscala.EntityType
import com.twitter.wtf.scalding.jobs.common.AdhocExecutionApp
import com.twitter.wtf.scalding.jobs.common.DataSources
import com.twitter.wtf.scalding.jobs.common.ScheduledExecutionApp
import java.util.TimeZone
/**
* $ ./bazel bundle src/scala/com/twitter/simclusters_v2/scalding/embedding:entity_embeddings_job-adhoc
*
* ---------------------- Deploy to atla ----------------------
* $ scalding remote run \
--main-class com.twitter.simclusters_v2.scalding.embedding.EntityToSimClustersEmbeddingAdhocApp \
--target src/scala/com/twitter/simclusters_v2/scalding/embedding:entity_embeddings_job-adhoc \
--user recos-platform \
-- --date 2019-09-09 --model-version 20M_145K_updated --entity-type SemanticCore
*/
object EntityToSimClustersEmbeddingAdhocApp extends AdhocExecutionApp {
import EmbeddingUtil._
import EntityEmbeddingUtil._
import EntityToSimClustersEmbeddingsJob._
import EntityUtil._
import SimClustersEmbeddingJob._
def writeOutput(
embeddings: TypedPipe[(SimClustersEmbeddingId, (ClusterId, EmbeddingScore))],
topKEmbeddings: TypedPipe[(SimClustersEmbeddingId, Seq[(ClusterId, EmbeddingScore)])],
jobConfig: EntityEmbeddingsJobConfig
): Execution[Unit] = {
val toSimClusterEmbeddingExec = topKEmbeddings
.mapValues(SimClustersEmbedding.apply(_).toThrift)
.writeExecution(
AdhocKeyValSources.entityToClustersSource(
EntityToSimClustersEmbeddingsJob.getHdfsPath(
isAdhoc = true,
isManhattanKeyVal = true,
isReverseIndex = false,
jobConfig.modelVersion,
jobConfig.entityType)))
val fromSimClusterEmbeddingExec =
toReverseIndexSimClusterEmbedding(embeddings, jobConfig.topK)
.writeExecution(
AdhocKeyValSources.clusterToEntitiesSource(
EntityToSimClustersEmbeddingsJob.getHdfsPath(
isAdhoc = true,
isManhattanKeyVal = true,
isReverseIndex = true,
jobConfig.modelVersion,
jobConfig.entityType)))
Execution.zip(toSimClusterEmbeddingExec, fromSimClusterEmbeddingExec).unit
}
override def runOnDateRange(
args: Args
)(
implicit dateRange: DateRange,
timeZone: TimeZone,
uniqueID: UniqueID
): Execution[Unit] = {
val jobConfig = EntityEmbeddingsJobConfig(args, isAdhoc = true)
val numReducers = args.getOrElse("m", "1000").toInt
/*
Using the ERG daily dataset in the adhoc job for quick prototyping, note that there may be
issues with scaling the job when productionizing on ERG aggregated dataset.
*/
val entityRealGraphSource = DataSources.entityRealGraphDailyDataSetSource
val entityUserMatrix: TypedPipe[(Entity, (UserId, Double))] =
(jobConfig.entityType match {
case EntityType.SemanticCore =>
getEntityUserMatrix(entityRealGraphSource, jobConfig.halfLife, EntityType.SemanticCore)
case EntityType.Hashtag =>
getEntityUserMatrix(entityRealGraphSource, jobConfig.halfLife, EntityType.Hashtag)
case _ =>
throw new IllegalArgumentException(
s"Argument [--entity-type] must be provided. Supported options [${EntityType.SemanticCore.name}, ${EntityType.Hashtag.name}]")
}).forceToDisk
val normalizedUserEntityMatrix =
getNormalizedTransposeInputMatrix(entityUserMatrix, numReducers = Some(numReducers))
//determine which data source to use based on model version
val simClustersSource = jobConfig.modelVersion match {
case ModelVersion.Model20m145kUpdated =>
InterestedInSources.simClustersInterestedInUpdatedSource(dateRange, timeZone)
case _ =>
InterestedInSources.simClustersInterestedInDec11Source(dateRange, timeZone)
}
val embeddings = computeEmbeddings(
simClustersSource,
normalizedUserEntityMatrix,
scoreExtractors,
ModelVersion.Model20m145kUpdated,
toSimClustersEmbeddingId(jobConfig.modelVersion),
numReducers = Some(numReducers * 2)
)
val topKEmbeddings =
embeddings.group
.sortedReverseTake(jobConfig.topK)(Ordering.by(_._2))
.withReducers(numReducers)
writeOutput(embeddings, topKEmbeddings, jobConfig)
}
}
/**
* $ ./bazel bundle src/scala/com/twitter/simclusters_v2/scalding/embedding:semantic_core_entity_embeddings_2020_job
* $ capesospy-v2 update \
--build_locally \
--start_cron semantic_core_entity_embeddings_2020_job src/scala/com/twitter/simclusters_v2/capesos_config/atla_proc3.yaml
*/
object SemanticCoreEntityEmbeddings2020App extends EntityToSimClustersEmbeddingApp
trait EntityToSimClustersEmbeddingApp extends ScheduledExecutionApp {
import EmbeddingUtil._
import EntityEmbeddingUtil._
import EntityToSimClustersEmbeddingsJob._
import EntityUtil._
import SimClustersEmbeddingJob._
override val firstTime: RichDate = RichDate("2023-01-01")
override val batchIncrement: Duration = Days(7)
private def writeOutput(
embeddings: TypedPipe[(SimClustersEmbeddingId, (ClusterId, EmbeddingScore))],
topKEmbeddings: TypedPipe[(SimClustersEmbeddingId, Seq[(ClusterId, EmbeddingScore)])],
jobConfig: EntityEmbeddingsJobConfig,
clusterEmbeddingsDataset: KeyValDALDataset[
KeyVal[SimClustersEmbeddingId, ThriftSimClustersEmbedding]
],
entityEmbeddingsDataset: KeyValDALDataset[KeyVal[SimClustersEmbeddingId, InternalIdEmbedding]]
): Execution[Unit] = {
val toSimClustersEmbeddings =
topKEmbeddings
.mapValues(SimClustersEmbedding.apply(_).toThrift)
.map {
case (entityId, topSimClusters) => KeyVal(entityId, topSimClusters)
}
.writeDALVersionedKeyValExecution(
clusterEmbeddingsDataset,
D.Suffix(
EntityToSimClustersEmbeddingsJob.getHdfsPath(
isAdhoc = false,
isManhattanKeyVal = true,
isReverseIndex = false,
jobConfig.modelVersion,
jobConfig.entityType))
)
val fromSimClustersEmbeddings =
toReverseIndexSimClusterEmbedding(embeddings, jobConfig.topK)
.map {
case (embeddingId, internalIdsWithScore) =>
KeyVal(embeddingId, internalIdsWithScore)
}
.writeDALVersionedKeyValExecution(
entityEmbeddingsDataset,
D.Suffix(
EntityToSimClustersEmbeddingsJob.getHdfsPath(
isAdhoc = false,
isManhattanKeyVal = true,
isReverseIndex = true,
jobConfig.modelVersion,
jobConfig.entityType))
)
Execution.zip(toSimClustersEmbeddings, fromSimClustersEmbeddings).unit
}
override def runOnDateRange(
args: Args
)(
implicit dateRange: DateRange,
timeZone: TimeZone,
uniqueID: UniqueID
): Execution[Unit] = {
val jobConfig = EntityEmbeddingsJobConfig(args, isAdhoc = false)
val embeddingsDataset = EntityEmbeddingsSources.getEntityEmbeddingsDataset(
jobConfig.entityType,
ModelVersions.toKnownForModelVersion(jobConfig.modelVersion)
)
val reverseIndexEmbeddingsDataset =
EntityEmbeddingsSources.getReverseIndexedEntityEmbeddingsDataset(
jobConfig.entityType,
ModelVersions.toKnownForModelVersion(jobConfig.modelVersion)
)
val entityRealGraphSource =
DataSources.entityRealGraphAggregationDataSetSource(dateRange.embiggen(Days(7)))
val entityUserMatrix: TypedPipe[(Entity, (UserId, Double))] =
getEntityUserMatrix(
entityRealGraphSource,
jobConfig.halfLife,
jobConfig.entityType).forceToDisk
val normalizedUserEntityMatrix = getNormalizedTransposeInputMatrix(entityUserMatrix)
val simClustersEmbedding = jobConfig.modelVersion match {
case ModelVersion.Model20m145k2020 =>
val simClustersSource2020 =
InterestedInSources.simClustersInterestedIn2020Source(dateRange, timeZone)
computeEmbeddings(
simClustersSource2020,
normalizedUserEntityMatrix,
scoreExtractors,
ModelVersion.Model20m145k2020,
toSimClustersEmbeddingId(ModelVersion.Model20m145k2020)
)
case modelVersion =>
throw new IllegalArgumentException(s"Model Version ${modelVersion.name} not supported")
}
val topKEmbeddings =
simClustersEmbedding.group.sortedReverseTake(jobConfig.topK)(Ordering.by(_._2))
val simClustersEmbeddingsExec =
writeOutput(
simClustersEmbedding,
topKEmbeddings,
jobConfig,
embeddingsDataset,
reverseIndexEmbeddingsDataset)
// We don't support embeddingsLite for the 2020 model version.
val embeddingsLiteExec = if (jobConfig.modelVersion == ModelVersion.Model20m145kUpdated) {
topKEmbeddings
.collect {
case (
SimClustersEmbeddingId(
EmbeddingType.FavBasedSematicCoreEntity,
ModelVersion.Model20m145kUpdated,
InternalId.EntityId(entityId)),
clustersWithScores) =>
entityId -> clustersWithScores
}
.flatMap {
case (entityId, clustersWithScores) =>
clustersWithScores.map {
case (clusterId, score) => EmbeddingsLite(entityId, clusterId, score)
}
case _ => Nil
}.writeDALSnapshotExecution(
SimclustersV2EmbeddingsLiteScalaDataset,
D.Daily,
D.Suffix(embeddingsLitePath(ModelVersion.Model20m145kUpdated, "fav_based")),
D.EBLzo(),
dateRange.end)
} else {
Execution.unit
}
Execution
.zip(simClustersEmbeddingsExec, embeddingsLiteExec).unit
}
}
object EntityToSimClustersEmbeddingsJob {
def toSimClustersEmbeddingId(
modelVersion: ModelVersion
): (Entity, ScoreType.ScoreType) => SimClustersEmbeddingId = {
case (Entity.SemanticCore(SemanticCoreEntity(entityId, _)), ScoreType.FavScore) =>
SimClustersEmbeddingId(
EmbeddingType.FavBasedSematicCoreEntity,
modelVersion,
InternalId.EntityId(entityId))
case (Entity.SemanticCore(SemanticCoreEntity(entityId, _)), ScoreType.FollowScore) =>
SimClustersEmbeddingId(
EmbeddingType.FollowBasedSematicCoreEntity,
modelVersion,
InternalId.EntityId(entityId))
case (Entity.Hashtag(Hashtag(hashtag)), ScoreType.FavScore) =>
SimClustersEmbeddingId(
EmbeddingType.FavBasedHashtagEntity,
modelVersion,
InternalId.Hashtag(hashtag))
case (Entity.Hashtag(Hashtag(hashtag)), ScoreType.FollowScore) =>
SimClustersEmbeddingId(
EmbeddingType.FollowBasedHashtagEntity,
modelVersion,
InternalId.Hashtag(hashtag))
case (scoreType, entity) =>
throw new IllegalArgumentException(
s"(ScoreType, Entity) ($scoreType, ${entity.toString}) not supported")
}
/**
* Generates the output path for the Entity Embeddings Job.
*
* Example Adhoc: /user/recos-platform/processed/adhoc/simclusters_embeddings/hashtag/model_20m_145k_updated
* Example Prod: /atla/proc/user/cassowary/processed/simclusters_embeddings/semantic_core/model_20m_145k_dec11
*
*/
def getHdfsPath(
isAdhoc: Boolean,
isManhattanKeyVal: Boolean,
isReverseIndex: Boolean,
modelVersion: ModelVersion,
entityType: EntityType
): String = {
val reverseIndex = if (isReverseIndex) "reverse_index/" else ""
val entityTypeSuffix = entityType match {
case EntityType.SemanticCore => "semantic_core"
case EntityType.Hashtag => "hashtag"
case _ => "unknown"
}
val pathSuffix = s"$reverseIndex$entityTypeSuffix"
EmbeddingUtil.getHdfsPath(isAdhoc, isManhattanKeyVal, modelVersion, pathSuffix)
}
def embeddingsLitePath(modelVersion: ModelVersion, pathSuffix: String): String = {
s"/user/cassowary/processed/entity_real_graph/simclusters_embedding/lite/$modelVersion/$pathSuffix/"
}
}