kopia lustrzana https://github.com/twitter/the-algorithm
355 wiersze
13 KiB
Scala
355 wiersze
13 KiB
Scala
package com.twitter.simclusters_v2.scalding.embedding
|
|
|
|
import com.twitter.dal.client.dataset.KeyValDALDataset
|
|
import com.twitter.recos.entities.thriftscala.Entity
|
|
import com.twitter.recos.entities.thriftscala.Hashtag
|
|
import com.twitter.recos.entities.thriftscala.SemanticCoreEntity
|
|
import com.twitter.scalding._
|
|
import com.twitter.scalding_internal.dalv2.DALWrite._
|
|
import com.twitter.scalding_internal.multiformat.format.keyval.KeyVal
|
|
import com.twitter.simclusters_v2.common.ModelVersions
|
|
import com.twitter.simclusters_v2.common.SimClustersEmbedding
|
|
import com.twitter.simclusters_v2.hdfs_sources._
|
|
import com.twitter.simclusters_v2.scalding.embedding.common.EmbeddingUtil
|
|
import com.twitter.simclusters_v2.scalding.embedding.common.EmbeddingUtil._
|
|
import com.twitter.simclusters_v2.scalding.embedding.common.EntityEmbeddingUtil
|
|
import com.twitter.simclusters_v2.scalding.embedding.common.SimClustersEmbeddingJob
|
|
import com.twitter.simclusters_v2.thriftscala.{
|
|
SimClustersEmbedding => ThriftSimClustersEmbedding,
|
|
_
|
|
}
|
|
import com.twitter.wtf.entity_real_graph.common.EntityUtil
|
|
import com.twitter.wtf.entity_real_graph.thriftscala.EntityType
|
|
import com.twitter.wtf.scalding.jobs.common.AdhocExecutionApp
|
|
import com.twitter.wtf.scalding.jobs.common.DataSources
|
|
import com.twitter.wtf.scalding.jobs.common.ScheduledExecutionApp
|
|
import java.util.TimeZone
|
|
|
|
/**
|
|
* $ ./bazel bundle src/scala/com/twitter/simclusters_v2/scalding/embedding:entity_embeddings_job-adhoc
|
|
*
|
|
* ---------------------- Deploy to atla ----------------------
|
|
* $ scalding remote run \
|
|
--main-class com.twitter.simclusters_v2.scalding.embedding.EntityToSimClustersEmbeddingAdhocApp \
|
|
--target src/scala/com/twitter/simclusters_v2/scalding/embedding:entity_embeddings_job-adhoc \
|
|
--user recos-platform \
|
|
-- --date 2019-09-09 --model-version 20M_145K_updated --entity-type SemanticCore
|
|
*/
|
|
object EntityToSimClustersEmbeddingAdhocApp extends AdhocExecutionApp {
|
|
|
|
import EmbeddingUtil._
|
|
import EntityEmbeddingUtil._
|
|
import EntityToSimClustersEmbeddingsJob._
|
|
import EntityUtil._
|
|
import SimClustersEmbeddingJob._
|
|
|
|
def writeOutput(
|
|
embeddings: TypedPipe[(SimClustersEmbeddingId, (ClusterId, EmbeddingScore))],
|
|
topKEmbeddings: TypedPipe[(SimClustersEmbeddingId, Seq[(ClusterId, EmbeddingScore)])],
|
|
jobConfig: EntityEmbeddingsJobConfig
|
|
): Execution[Unit] = {
|
|
|
|
val toSimClusterEmbeddingExec = topKEmbeddings
|
|
.mapValues(SimClustersEmbedding.apply(_).toThrift)
|
|
.writeExecution(
|
|
AdhocKeyValSources.entityToClustersSource(
|
|
EntityToSimClustersEmbeddingsJob.getHdfsPath(
|
|
isAdhoc = true,
|
|
isManhattanKeyVal = true,
|
|
isReverseIndex = false,
|
|
jobConfig.modelVersion,
|
|
jobConfig.entityType)))
|
|
|
|
val fromSimClusterEmbeddingExec =
|
|
toReverseIndexSimClusterEmbedding(embeddings, jobConfig.topK)
|
|
.writeExecution(
|
|
AdhocKeyValSources.clusterToEntitiesSource(
|
|
EntityToSimClustersEmbeddingsJob.getHdfsPath(
|
|
isAdhoc = true,
|
|
isManhattanKeyVal = true,
|
|
isReverseIndex = true,
|
|
jobConfig.modelVersion,
|
|
jobConfig.entityType)))
|
|
|
|
Execution.zip(toSimClusterEmbeddingExec, fromSimClusterEmbeddingExec).unit
|
|
}
|
|
|
|
override def runOnDateRange(
|
|
args: Args
|
|
)(
|
|
implicit dateRange: DateRange,
|
|
timeZone: TimeZone,
|
|
uniqueID: UniqueID
|
|
): Execution[Unit] = {
|
|
|
|
val jobConfig = EntityEmbeddingsJobConfig(args, isAdhoc = true)
|
|
|
|
val numReducers = args.getOrElse("m", "1000").toInt
|
|
|
|
/*
|
|
Using the ERG daily dataset in the adhoc job for quick prototyping, note that there may be
|
|
issues with scaling the job when productionizing on ERG aggregated dataset.
|
|
*/
|
|
val entityRealGraphSource = DataSources.entityRealGraphDailyDataSetSource
|
|
|
|
val entityUserMatrix: TypedPipe[(Entity, (UserId, Double))] =
|
|
(jobConfig.entityType match {
|
|
case EntityType.SemanticCore =>
|
|
getEntityUserMatrix(entityRealGraphSource, jobConfig.halfLife, EntityType.SemanticCore)
|
|
case EntityType.Hashtag =>
|
|
getEntityUserMatrix(entityRealGraphSource, jobConfig.halfLife, EntityType.Hashtag)
|
|
case _ =>
|
|
throw new IllegalArgumentException(
|
|
s"Argument [--entity-type] must be provided. Supported options [${EntityType.SemanticCore.name}, ${EntityType.Hashtag.name}]")
|
|
}).forceToDisk
|
|
|
|
val normalizedUserEntityMatrix =
|
|
getNormalizedTransposeInputMatrix(entityUserMatrix, numReducers = Some(numReducers))
|
|
|
|
//determine which data source to use based on model version
|
|
val simClustersSource = jobConfig.modelVersion match {
|
|
case ModelVersion.Model20m145kUpdated =>
|
|
InterestedInSources.simClustersInterestedInUpdatedSource(dateRange, timeZone)
|
|
case _ =>
|
|
InterestedInSources.simClustersInterestedInDec11Source(dateRange, timeZone)
|
|
}
|
|
|
|
val embeddings = computeEmbeddings(
|
|
simClustersSource,
|
|
normalizedUserEntityMatrix,
|
|
scoreExtractors,
|
|
ModelVersion.Model20m145kUpdated,
|
|
toSimClustersEmbeddingId(jobConfig.modelVersion),
|
|
numReducers = Some(numReducers * 2)
|
|
)
|
|
|
|
val topKEmbeddings =
|
|
embeddings.group
|
|
.sortedReverseTake(jobConfig.topK)(Ordering.by(_._2))
|
|
.withReducers(numReducers)
|
|
|
|
writeOutput(embeddings, topKEmbeddings, jobConfig)
|
|
}
|
|
}
|
|
|
|
/**
|
|
* $ ./bazel bundle src/scala/com/twitter/simclusters_v2/scalding/embedding:semantic_core_entity_embeddings_2020_job
|
|
* $ capesospy-v2 update \
|
|
--build_locally \
|
|
--start_cron semantic_core_entity_embeddings_2020_job src/scala/com/twitter/simclusters_v2/capesos_config/atla_proc3.yaml
|
|
*/
|
|
object SemanticCoreEntityEmbeddings2020App extends EntityToSimClustersEmbeddingApp
|
|
|
|
trait EntityToSimClustersEmbeddingApp extends ScheduledExecutionApp {
|
|
|
|
import EmbeddingUtil._
|
|
import EntityEmbeddingUtil._
|
|
import EntityToSimClustersEmbeddingsJob._
|
|
import EntityUtil._
|
|
import SimClustersEmbeddingJob._
|
|
|
|
override val firstTime: RichDate = RichDate("2023-01-01")
|
|
|
|
override val batchIncrement: Duration = Days(7)
|
|
|
|
private def writeOutput(
|
|
embeddings: TypedPipe[(SimClustersEmbeddingId, (ClusterId, EmbeddingScore))],
|
|
topKEmbeddings: TypedPipe[(SimClustersEmbeddingId, Seq[(ClusterId, EmbeddingScore)])],
|
|
jobConfig: EntityEmbeddingsJobConfig,
|
|
clusterEmbeddingsDataset: KeyValDALDataset[
|
|
KeyVal[SimClustersEmbeddingId, ThriftSimClustersEmbedding]
|
|
],
|
|
entityEmbeddingsDataset: KeyValDALDataset[KeyVal[SimClustersEmbeddingId, InternalIdEmbedding]]
|
|
): Execution[Unit] = {
|
|
|
|
val toSimClustersEmbeddings =
|
|
topKEmbeddings
|
|
.mapValues(SimClustersEmbedding.apply(_).toThrift)
|
|
.map {
|
|
case (entityId, topSimClusters) => KeyVal(entityId, topSimClusters)
|
|
}
|
|
.writeDALVersionedKeyValExecution(
|
|
clusterEmbeddingsDataset,
|
|
D.Suffix(
|
|
EntityToSimClustersEmbeddingsJob.getHdfsPath(
|
|
isAdhoc = false,
|
|
isManhattanKeyVal = true,
|
|
isReverseIndex = false,
|
|
jobConfig.modelVersion,
|
|
jobConfig.entityType))
|
|
)
|
|
|
|
val fromSimClustersEmbeddings =
|
|
toReverseIndexSimClusterEmbedding(embeddings, jobConfig.topK)
|
|
.map {
|
|
case (embeddingId, internalIdsWithScore) =>
|
|
KeyVal(embeddingId, internalIdsWithScore)
|
|
}
|
|
.writeDALVersionedKeyValExecution(
|
|
entityEmbeddingsDataset,
|
|
D.Suffix(
|
|
EntityToSimClustersEmbeddingsJob.getHdfsPath(
|
|
isAdhoc = false,
|
|
isManhattanKeyVal = true,
|
|
isReverseIndex = true,
|
|
jobConfig.modelVersion,
|
|
jobConfig.entityType))
|
|
)
|
|
|
|
Execution.zip(toSimClustersEmbeddings, fromSimClustersEmbeddings).unit
|
|
}
|
|
|
|
override def runOnDateRange(
|
|
args: Args
|
|
)(
|
|
implicit dateRange: DateRange,
|
|
timeZone: TimeZone,
|
|
uniqueID: UniqueID
|
|
): Execution[Unit] = {
|
|
|
|
val jobConfig = EntityEmbeddingsJobConfig(args, isAdhoc = false)
|
|
|
|
val embeddingsDataset = EntityEmbeddingsSources.getEntityEmbeddingsDataset(
|
|
jobConfig.entityType,
|
|
ModelVersions.toKnownForModelVersion(jobConfig.modelVersion)
|
|
)
|
|
|
|
val reverseIndexEmbeddingsDataset =
|
|
EntityEmbeddingsSources.getReverseIndexedEntityEmbeddingsDataset(
|
|
jobConfig.entityType,
|
|
ModelVersions.toKnownForModelVersion(jobConfig.modelVersion)
|
|
)
|
|
|
|
val entityRealGraphSource =
|
|
DataSources.entityRealGraphAggregationDataSetSource(dateRange.embiggen(Days(7)))
|
|
|
|
val entityUserMatrix: TypedPipe[(Entity, (UserId, Double))] =
|
|
getEntityUserMatrix(
|
|
entityRealGraphSource,
|
|
jobConfig.halfLife,
|
|
jobConfig.entityType).forceToDisk
|
|
|
|
val normalizedUserEntityMatrix = getNormalizedTransposeInputMatrix(entityUserMatrix)
|
|
|
|
val simClustersEmbedding = jobConfig.modelVersion match {
|
|
case ModelVersion.Model20m145k2020 =>
|
|
val simClustersSource2020 =
|
|
InterestedInSources.simClustersInterestedIn2020Source(dateRange, timeZone)
|
|
computeEmbeddings(
|
|
simClustersSource2020,
|
|
normalizedUserEntityMatrix,
|
|
scoreExtractors,
|
|
ModelVersion.Model20m145k2020,
|
|
toSimClustersEmbeddingId(ModelVersion.Model20m145k2020)
|
|
)
|
|
case modelVersion =>
|
|
throw new IllegalArgumentException(s"Model Version ${modelVersion.name} not supported")
|
|
}
|
|
|
|
val topKEmbeddings =
|
|
simClustersEmbedding.group.sortedReverseTake(jobConfig.topK)(Ordering.by(_._2))
|
|
|
|
val simClustersEmbeddingsExec =
|
|
writeOutput(
|
|
simClustersEmbedding,
|
|
topKEmbeddings,
|
|
jobConfig,
|
|
embeddingsDataset,
|
|
reverseIndexEmbeddingsDataset)
|
|
|
|
// We don't support embeddingsLite for the 2020 model version.
|
|
val embeddingsLiteExec = if (jobConfig.modelVersion == ModelVersion.Model20m145kUpdated) {
|
|
topKEmbeddings
|
|
.collect {
|
|
case (
|
|
SimClustersEmbeddingId(
|
|
EmbeddingType.FavBasedSematicCoreEntity,
|
|
ModelVersion.Model20m145kUpdated,
|
|
InternalId.EntityId(entityId)),
|
|
clustersWithScores) =>
|
|
entityId -> clustersWithScores
|
|
}
|
|
.flatMap {
|
|
case (entityId, clustersWithScores) =>
|
|
clustersWithScores.map {
|
|
case (clusterId, score) => EmbeddingsLite(entityId, clusterId, score)
|
|
}
|
|
case _ => Nil
|
|
}.writeDALSnapshotExecution(
|
|
SimclustersV2EmbeddingsLiteScalaDataset,
|
|
D.Daily,
|
|
D.Suffix(embeddingsLitePath(ModelVersion.Model20m145kUpdated, "fav_based")),
|
|
D.EBLzo(),
|
|
dateRange.end)
|
|
} else {
|
|
Execution.unit
|
|
}
|
|
|
|
Execution
|
|
.zip(simClustersEmbeddingsExec, embeddingsLiteExec).unit
|
|
}
|
|
}
|
|
|
|
object EntityToSimClustersEmbeddingsJob {
|
|
|
|
def toSimClustersEmbeddingId(
|
|
modelVersion: ModelVersion
|
|
): (Entity, ScoreType.ScoreType) => SimClustersEmbeddingId = {
|
|
case (Entity.SemanticCore(SemanticCoreEntity(entityId, _)), ScoreType.FavScore) =>
|
|
SimClustersEmbeddingId(
|
|
EmbeddingType.FavBasedSematicCoreEntity,
|
|
modelVersion,
|
|
InternalId.EntityId(entityId))
|
|
case (Entity.SemanticCore(SemanticCoreEntity(entityId, _)), ScoreType.FollowScore) =>
|
|
SimClustersEmbeddingId(
|
|
EmbeddingType.FollowBasedSematicCoreEntity,
|
|
modelVersion,
|
|
InternalId.EntityId(entityId))
|
|
case (Entity.Hashtag(Hashtag(hashtag)), ScoreType.FavScore) =>
|
|
SimClustersEmbeddingId(
|
|
EmbeddingType.FavBasedHashtagEntity,
|
|
modelVersion,
|
|
InternalId.Hashtag(hashtag))
|
|
case (Entity.Hashtag(Hashtag(hashtag)), ScoreType.FollowScore) =>
|
|
SimClustersEmbeddingId(
|
|
EmbeddingType.FollowBasedHashtagEntity,
|
|
modelVersion,
|
|
InternalId.Hashtag(hashtag))
|
|
case (scoreType, entity) =>
|
|
throw new IllegalArgumentException(
|
|
s"(ScoreType, Entity) ($scoreType, ${entity.toString}) not supported")
|
|
}
|
|
|
|
/**
|
|
* Generates the output path for the Entity Embeddings Job.
|
|
*
|
|
* Example Adhoc: /user/recos-platform/processed/adhoc/simclusters_embeddings/hashtag/model_20m_145k_updated
|
|
* Example Prod: /atla/proc/user/cassowary/processed/simclusters_embeddings/semantic_core/model_20m_145k_dec11
|
|
*
|
|
*/
|
|
def getHdfsPath(
|
|
isAdhoc: Boolean,
|
|
isManhattanKeyVal: Boolean,
|
|
isReverseIndex: Boolean,
|
|
modelVersion: ModelVersion,
|
|
entityType: EntityType
|
|
): String = {
|
|
|
|
val reverseIndex = if (isReverseIndex) "reverse_index/" else ""
|
|
|
|
val entityTypeSuffix = entityType match {
|
|
case EntityType.SemanticCore => "semantic_core"
|
|
case EntityType.Hashtag => "hashtag"
|
|
case _ => "unknown"
|
|
}
|
|
|
|
val pathSuffix = s"$reverseIndex$entityTypeSuffix"
|
|
|
|
EmbeddingUtil.getHdfsPath(isAdhoc, isManhattanKeyVal, modelVersion, pathSuffix)
|
|
}
|
|
|
|
def embeddingsLitePath(modelVersion: ModelVersion, pathSuffix: String): String = {
|
|
s"/user/cassowary/processed/entity_real_graph/simclusters_embedding/lite/$modelVersion/$pathSuffix/"
|
|
}
|
|
}
|