fawkes/fawkes/differentiator.py

300 wiersze
12 KiB
Python

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Date : 2020-10-21
# @Author : Emily Wenger (ewenger@uchicago.edu)
import datetime
import time
import numpy as np
import tensorflow as tf
from fawkes.utils import preprocess, reverse_preprocess
from keras.utils import Progbar
class FawkesMaskGeneration:
# if the attack is trying to mimic a target image or a neuron vector
MIMIC_IMG = True
# number of iterations to perform gradient descent
MAX_ITERATIONS = 10000
# larger values converge faster to less accurate results
LEARNING_RATE = 1e-2
# the initial constant c to pick as a first guess
INITIAL_CONST = 1
# pixel intensity range
INTENSITY_RANGE = 'imagenet'
# threshold for distance
L_THRESHOLD = 0.03
# whether keep the final result or the best result
KEEP_FINAL = False
# max_val of image
MAX_VAL = 255
MAXIMIZE = False
IMAGE_SHAPE = (112, 112, 3)
RATIO = 1.0
LIMIT_DIST = False
LOSS_TYPE = 'features' # use features (original Fawkes) or gradients (Witches Brew) to run Fawkes?
def __init__(self, bottleneck_model_ls, mimic_img=MIMIC_IMG,
batch_size=1, learning_rate=LEARNING_RATE,
max_iterations=MAX_ITERATIONS, initial_const=INITIAL_CONST,
intensity_range=INTENSITY_RANGE, l_threshold=L_THRESHOLD,
max_val=MAX_VAL, keep_final=KEEP_FINAL, maximize=MAXIMIZE, image_shape=IMAGE_SHAPE, verbose=1,
ratio=RATIO, limit_dist=LIMIT_DIST, loss_method=LOSS_TYPE, tanh_process=True,
save_last_on_failed=True):
assert intensity_range in {'raw', 'imagenet', 'inception', 'mnist'}
# constant used for tanh transformation to avoid corner cases
self.it = 0
self.tanh_constant = 2 - 1e-6
self.save_last_on_failed = save_last_on_failed
self.MIMIC_IMG = mimic_img
self.LEARNING_RATE = learning_rate
self.MAX_ITERATIONS = max_iterations
self.initial_const = initial_const
self.batch_size = batch_size
self.intensity_range = intensity_range
self.l_threshold = l_threshold
self.max_val = max_val
self.keep_final = keep_final
self.verbose = verbose
self.maximize = maximize
self.learning_rate = learning_rate
self.ratio = ratio
self.limit_dist = limit_dist
self.single_shape = list(image_shape)
self.bottleneck_models = bottleneck_model_ls
self.loss_method = loss_method
self.tanh_process = tanh_process
@staticmethod
def resize_tensor(input_tensor, model_input_shape):
if input_tensor.shape[1:] == model_input_shape or model_input_shape[1] is None:
return input_tensor
resized_tensor = tf.image.resize(input_tensor, model_input_shape[:2])
return resized_tensor
def preprocess_arctanh(self, imgs):
""" Do tan preprocess """
imgs = reverse_preprocess(imgs, self.intensity_range)
imgs = imgs / 255.0
imgs = imgs - 0.5
imgs = imgs * self.tanh_constant
tanh_imgs = np.arctanh(imgs)
return tanh_imgs
def reverse_arctanh(self, imgs):
raw_img = (tf.tanh(imgs) / self.tanh_constant + 0.5) * 255
return raw_img
def input_space_process(self, img):
if self.intensity_range == 'imagenet':
mean = np.repeat([[[[103.939, 116.779, 123.68]]]], len(img), axis=0)
raw_img = (img[..., ::-1] - mean)
else:
raw_img = img
return raw_img
def clipping(self, imgs):
imgs = reverse_preprocess(imgs, self.intensity_range)
imgs = np.clip(imgs, 0, self.max_val)
imgs = preprocess(imgs, self.intensity_range)
return imgs
def calc_dissim(self, source_raw, source_mod_raw):
msssim_split = tf.image.ssim(source_raw, source_mod_raw, max_val=255.0)
dist_raw = (1.0 - tf.stack(msssim_split)) / 2.0
dist = tf.maximum(dist_raw - self.l_threshold, 0.0)
dist_raw_avg = tf.reduce_mean(dist_raw)
dist_sum = tf.reduce_sum(dist)
return dist, dist_raw, dist_sum, dist_raw_avg
def calc_bottlesim(self, tape, source_raw, target_raw, original_raw):
""" original Fawkes loss function. """
bottlesim = 0.0
bottlesim_sum = 0.0
# make sure everything is the right size.
model_input_shape = self.single_shape
cur_aimg_input = self.resize_tensor(source_raw, model_input_shape)
if target_raw is not None:
cur_timg_input = self.resize_tensor(target_raw, model_input_shape)
for bottleneck_model in self.bottleneck_models:
if tape is not None:
try:
tape.watch(bottleneck_model.model.variables)
except AttributeError:
tape.watch(bottleneck_model.variables)
# get the respective feature space reprs.
bottleneck_a = bottleneck_model(cur_aimg_input)
if self.maximize:
bottleneck_s = bottleneck_model(original_raw)
bottleneck_diff = bottleneck_a - bottleneck_s
scale_factor = tf.sqrt(tf.reduce_sum(tf.square(bottleneck_s), axis=1))
else:
bottleneck_t = bottleneck_model(cur_timg_input)
bottleneck_diff = bottleneck_t - bottleneck_a
scale_factor = tf.sqrt(tf.reduce_sum(tf.square(bottleneck_t), axis=1))
cur_bottlesim = tf.reduce_sum(tf.square(bottleneck_diff), axis=1)
cur_bottlesim = cur_bottlesim / scale_factor
bottlesim += cur_bottlesim
bottlesim_sum += tf.reduce_sum(cur_bottlesim)
return bottlesim, bottlesim_sum
def compute_feature_loss(self, tape, aimg_raw, simg_raw, aimg_input, timg_input, simg_input):
""" Compute input space + feature space loss.
"""
input_space_loss, dist_raw, input_space_loss_sum, input_space_loss_raw_avg = self.calc_dissim(aimg_raw,
simg_raw)
feature_space_loss, feature_space_loss_sum = self.calc_bottlesim(tape, aimg_input, timg_input, simg_input)
if self.maximize:
loss = self.const * tf.square(input_space_loss) - feature_space_loss * self.const_diff
else:
if self.it < self.MAX_ITERATIONS:
loss = self.const * tf.square(input_space_loss) + 1000 * feature_space_loss
loss_sum = tf.reduce_sum(loss)
return loss_sum, feature_space_loss, input_space_loss_raw_avg, dist_raw
def compute(self, source_imgs, target_imgs=None):
""" Main function that runs cloak generation. """
start_time = time.time()
adv_imgs = []
for idx in range(0, len(source_imgs), self.batch_size):
print('processing image %d at %s' % (idx + 1, datetime.datetime.now()))
adv_img = self.compute_batch(source_imgs[idx:idx + self.batch_size],
target_imgs[idx:idx + self.batch_size] if target_imgs is not None else None)
adv_imgs.extend(adv_img)
elapsed_time = time.time() - start_time
print('protection cost %f s' % elapsed_time)
return np.array(adv_imgs)
def compute_batch(self, source_imgs, target_imgs=None, retry=True):
""" TF2 method to generate the cloak. """
# preprocess images.
global progressbar
nb_imgs = source_imgs.shape[0]
# make sure source/target images are an array
source_imgs = np.array(source_imgs, dtype=np.float32)
if target_imgs is not None:
target_imgs = np.array(target_imgs, dtype=np.float32)
# metrics to test
best_bottlesim = [0] * nb_imgs if self.maximize else [np.inf] * nb_imgs
best_adv = np.zeros(source_imgs.shape)
# convert to tanh-space
simg_tanh = self.preprocess_arctanh(source_imgs)
if target_imgs is not None:
timg_tanh = self.preprocess_arctanh(target_imgs)
self.modifier = tf.Variable(np.random.uniform(-1, 1, tuple([len(source_imgs)] + self.single_shape)) * 1e-4,
dtype=tf.float32)
# make the optimizer
optimizer = tf.keras.optimizers.Adadelta(float(self.learning_rate))
const_numpy = np.ones(len(source_imgs)) * self.initial_const
self.const = tf.Variable(const_numpy, dtype=np.float32)
const_diff_numpy = np.ones(len(source_imgs)) * 1.0
self.const_diff = tf.Variable(const_diff_numpy, dtype=np.float32)
# get the modifier
if self.verbose == 0:
progressbar = Progbar(
self.MAX_ITERATIONS, width=30, verbose=1
)
# watch relevant variables.
simg_tanh = tf.Variable(simg_tanh, dtype=np.float32)
simg_raw = tf.Variable(source_imgs, dtype=np.float32)
if target_imgs is not None:
timg_raw = tf.Variable(timg_tanh, dtype=np.float32)
# run the attack
outside_list = np.ones(len(source_imgs))
self.it = 0
while self.it < self.MAX_ITERATIONS:
self.it += 1
with tf.GradientTape(persistent=True) as tape:
tape.watch(self.modifier)
tape.watch(simg_tanh)
# Convert from tanh for DISSIM
aimg_raw = self.reverse_arctanh(simg_tanh + self.modifier)
actual_modifier = aimg_raw - simg_raw
actual_modifier = tf.clip_by_value(actual_modifier, -15.0, 15.0)
aimg_raw = simg_raw + actual_modifier
simg_raw = self.reverse_arctanh(simg_tanh)
# Convert further preprocess for bottleneck
aimg_input = self.input_space_process(aimg_raw)
if target_imgs is not None:
timg_input = self.input_space_process(timg_raw)
else:
timg_input = None
simg_input = self.input_space_process(simg_raw)
# get the feature space loss.
loss, internal_dist, input_dist_avg, dist_raw = self.compute_feature_loss(
tape, aimg_raw, simg_raw, aimg_input, timg_input, simg_input)
# compute gradients
grad = tape.gradient(loss, [self.modifier])
optimizer.apply_gradients(zip(grad, [self.modifier]))
if self.it == 1:
self.modifier = tf.Variable(self.modifier - tf.sign(grad[0]) * 0.01, dtype=tf.float32)
for e, (input_dist, feature_d, mod_img) in enumerate(zip(dist_raw, internal_dist, aimg_input)):
if e >= nb_imgs:
break
input_dist = input_dist.numpy()
feature_d = feature_d.numpy()
if input_dist <= self.l_threshold * 0.9 and const_diff_numpy[e] <= 129:
const_diff_numpy[e] *= 2
if outside_list[e] == -1:
const_diff_numpy[e] = 1
outside_list[e] = 1
elif input_dist >= self.l_threshold * 1.1 and const_diff_numpy[e] >= 1 / 129:
const_diff_numpy[e] /= 2
if outside_list[e] == 1:
const_diff_numpy[e] = 1
outside_list[e] = -1
else:
const_diff_numpy[e] = 1.0
outside_list[e] = 0
if input_dist <= self.l_threshold * 1.1 and (
(feature_d < best_bottlesim[e] and (not self.maximize)) or (
feature_d > best_bottlesim[e] and self.maximize)):
best_bottlesim[e] = feature_d
best_adv[e] = mod_img
self.const_diff = tf.Variable(const_diff_numpy, dtype=np.float32)
if self.verbose == 1:
print("ITER {:0.2f} Total Loss: {:.2f} {:0.4f} raw; diff: {:.4f}".format(self.it, loss, input_dist_avg,
np.mean(internal_dist)))
if self.verbose == 0:
progressbar.update(self.it)
if self.verbose == 1:
print("Final diff: {:.4f}".format(np.mean(best_bottlesim)))
print("\n")
if self.save_last_on_failed:
for e, diff in enumerate(best_bottlesim):
if diff < 0.3 and dist_raw[e] < 0.015 and internal_dist[e] > diff:
best_adv[e] = aimg_input[e]
best_adv = self.clipping(best_adv[:nb_imgs])
return best_adv