Etude des panneaux d'affichage de places de parking
![]() | |
---|---|
Titre du projet | Etude des panneaux d'affichage de places de parking |
Cadre | Projets Réseaux Mobiles et Avancés
|
Équipe | Pierre Le Pailleur, Marie-Alix Juffard |
Encadrants | Franck Rousseau |
Sommaire
Introduction
Dans le cadre du projet Réseaux mobiles et avancés, nous avons décidé de nous intéresser aux panneaux de parkings, aussi appelés panneaux à jalonnement dynamique.
Nous sommes partis d'un projet de l'année dernière [1] portant sur la rétro-ingénierie de signaux radio.
Le but de notre projet était d'écouter les signaux envoyés aux panneaux à jalonnement dynamique, de les comprendre et pourquoi pas d'en forger de nouveaux pour modifier les affichages des panneaux, à l'image d'un hack déjà réalisé en 2015 [2]
Matériels et outils
Nous avons utilisé le même matériel que le projet dont nous nous sommes inspirés, vous trouverez une description complète et détaillée ici : [3]
Matériels
Nous avons utilisé une radio logicielle (ou SDR pour Software Defined Radio en anglais). Le principe d'une telle radio est de déléguer le traitement du signal à un logiciel au lieu de le réaliser de manière électronique. Ici, nous avons utilisé la RTL qui est un tuner DVB-T TV branchable par clef USB.
- RTL-SDR
- Antenne
Outils logiciels
Nous avons utilisé principalement un logiciel libre : Gqrx. Ce logiciel implémente de manière logicielle les composants électroniques nécessaires au traitement du signal. Gqrx est basé sur GNU Radio mais il est plus facile d'utilisation que ce dernier. L'utilisation de ces deux logiciels est détaillée ici : [4].
De plus, nous avons utilisé Audacity pour la visualisation des fichiers .wav obtenus grâce au logiciel précédemment cité.
- gqrx
- audacity
- Scripts réalisés en Python
Protocole de réalisation des mesures
Les mesures ont été réalisée Porte de France à Grenoble, en combinant écoute des signaux radio avec le logiciel gqrx, la clé usb RTL-SDR et l'antenne, et observation des 5 panneaux à jalonnement dynamique.
Détermination de la fréquence d'émission des signaux :
- Observation du type de modification des valeurs des tableaux (modification périodique) puis de la période de modification (1 min 25)
- Écoute sur les différentes bandes de fréquences typiques de ce genre d'émission, recherche d'un signal envoyé périodiquement, avec une période inférieure à celle de modification des panneaux
Enregistrement des mesures :
- Utilisation du logiciel gqrx pour enregistrer les signaux, ainsi que d'un logiciel de traitement de texte pour noter les changements de valeur des tableaux
- Protocole pour un ensemble de mesures : enregistrement de tous les signaux entre deux changements de valeur des tableaux puis notation des anciennes et nouvelles valeurs
Observations
- Fréquence d’affichage régulière sur les panneaux toutes les 1min25
- Fréquence émission: 867,9 MHz
- Signaux reçu aussi de manière régulière
Hypothèses:
- Changement de l’affichage à la réception d’une trame --> hypothèse invalidée, le changement de l'affichage est périodique
- Une trame par panneau dont la valeur change --> hypothèse invalidée, on peut recevoir plusieurs trames avant modification des valeurs
- Émission dans toute la ville --> hypothèse validée, on capte le même signal depuis l'Ensimag et d'autres points du centre-ville
- Les valeurs des 5 panneaux se trouvent dans chaque trame --> les trames sont courtes pour tout contenir...
- Signal émis en Manchester --> étude du signal en Manchester differential
- Données transmises en clair / chiffrement XOR
- Valeurs des panneaux en binaire ou en ascii
Décisions prises pour l'exploitation des trames :
- Différents essais Manchester/Manchester differential
- Tests en bruteforce pour savoir si les données son transmises en clair ou en XOR
- Tests pour savoir si les valeurs des panneaux sont transmises en binaire ou en ascii
- Renvoi des trames reçues pour voir si les valeurs des panneaux changent (finalement non testé, pas de matériel pour émettre)
Traitement et analyse des signaux, mode d'emploi
Pré-traitement
Une fois les signaux originaux obtenus, à l'aide d'audacity, nous commençons par isoler une trame dans le signal et nous suggérons de la renommer à l'aide de la fonction export audio avec une norme qui permet de garder l'ordre chronologique telle que [numTrame]P[numPanneau].wav.
Voici à quoi ressemble alors les signaux :
Afin d’écrêter ce signal, nous executons le script python wav_to_rec_signal sur la trame isolée :
$python wav_to_rec_signal.py [numTrame]P[numPanneau].wav -o [numTrame]P[numPanneau]_a.wav
Voilà à quoi ressemble alors les signaux :
Il est souvent nécessaire d'effectuer un traitement manuel pour le début et la fin du signal afin de pouvoir ajuster la propreté du signal pour la suite.
Manchester / Manchester differential
Nous pouvons générer les fichiers textes contenant les octets décodés. Pour cela nous utilisons decode_wav.py pour décoder les trames façon Manchester ou decoder_differential_manchester.py pour du Manchester differential :
$python decode_wav.py [numTrame]P[numPanneau]_a.wav > P[numPanneau]res[numTrame].txt
Le fonctionnement du Manchester est détaillé dans le projet dont celui ci prend la suite, mais pour le résumer, il s'agit d'assimiler les transitions à un bit : 0 -> 1 = 1 et 1 -> 0 = 0
Tandis que dans Manchester, nous considérons la présence (pour 1)/ absence (pour 0) de transition, comme suivant :
Décryptage des octets obtenus
Comme nous faisions toujours face à beaucoup de transitions même après avoir récupéré les octets à travers l’hypothèse Manchester differential, nous avons essayer de chercher brutalement si nous pouvions trouver à l'aide d'un XOR si nous pouvions trouver le code ascii ou les valeurs des panneaux avec des entiers, dans les fichiers textes contenant les octets (voir la section Scripts).
Résultats et comparaisons des trames obtenues
En analysant les différentes trames obtenues et en les comparant avec les valeurs des panneaux au moment des mesures, on obtient ce genre de résultats, en hexadécimal :
Scripts et données récupérées
Selon les cas d'utilisation, il faudra mettre à jour / adapter les scripts suivants :
- wav_to_rec_signal.py : script récupéré d'un groupe de l'année précédente pour traiter les signaux et les rendre facilement exploitable.
wav_to_rec.py |
import wave
import struct
import argparse
parser = argparse.ArgumentParser(description="Signaux recu en numerique")
parser.add_argument("filename", help="Fichier wav en entree")
parser.add_argument("-o", "--out", help="Fichier wav de sortie. Par defaut \"out.wav\"", default="out.wav")
args = parser.parse_args()
SIZE_TO_FMT = {1:'B', 2:'H', 4:'I', 8:'Q'}
wv = wave.open(args.filename)
sw = wv.getsampwidth()
fmt = SIZE_TO_FMT[sw]
nc = wv.getnchannels()
nf = wv.getnframes()
# COMPUTE THE AVG
l = [[]] * sw
for _ in xrange(nf):
frame = wv.readframes(1)
for i in xrange(nc):
v = struct.unpack("<"+fmt, frame[i*sw:(i+1)*sw])[0]
l[i].append(v)
avg = [sum(v) / len(v) for v in l]
out = wave.open(args.out,"w")
out.setparams(wv.getparams())
wv.rewind()
for _ in xrange(nf):
frame = wv.readframes(1)
v2 = ""
for i in xrange(nc):
v = struct.unpack("<"+fmt, frame[i*sw:(i+1)*sw])[0]
if v > avg[i]:
v2 += "\x00"*sw
else:
v2 += "\x30"*sw
out.writeframes(v2)
out.close()
wv.close()
|
- decode_wav.py : pour décoder des signaux exploitables façon Manchester
decode_wav.py |
import argparse
import wave
import struct
from itertools import groupby
import sys
import scipy.io.wavfile
parser = argparse.ArgumentParser(description="Decoder")
parser.add_argument("filename", help="Fichier wav")
parser.add_argument("-s", "--seuil-long-court", help="Seuil entre demi temps et temps complet en nombre de points wav", default=25)
parser.add_argument("-p", "--seuil-pause", help="Seuil entre emission et pause entre question en nombre de points wav", default=150)
args = parser.parse_args()
SEUIL_LONG_COURT = args.seuil_long_court
SEUIL_PAUSE = args.seuil_pause
SIZE_TO_FMT = {1:'B', 2:'H', 4:'I', 8:'Q'}
SEUIL_SPLIT_LEN = 33
SEUIL_SPLIT_SIGNAL = 100
wv = wave.open(args.filename)
sw = wv.getsampwidth()
print('sw: ' + str(sw))
fmt = SIZE_TO_FMT[sw]
print('fmt: ' + str(fmt))
nc = wv.getnchannels()
print('nc: ' + str(nc))
nf = wv.getnframes()
print('nf: ' + str(nf))
avg = 6000
print('avg: ' + str(avg))
data = scipy.io.wavfile.read(args.filename)
print(len(data[1]))
list_frames = []
manchester_coded_binaries = []
bin_values = []
nb_err = 0
for i in range(len(data[1])):
try:
value = str(data[1][i]).split(' ')[0]
value = value.replace('[', '')
value = int(value)
except:
try:
value = str(data[1][i]).split(' ')[-1]
value = value.replace('[', '').replace(']', '')
value = int(value)
except:
print('HERE: ' + str(data[1][i]) + 'there')
if value > avg:
bin_values.append(1)
else:
bin_values.append(0)
current_len = 0
current_bin = 0
for bin_val in bin_values:
if bin_val == current_bin:
current_len += 1
else:
if current_len < SEUIL_SPLIT_LEN:
manchester_coded_binaries.append(current_bin)
elif current_len > SEUIL_SPLIT_LEN and current_len < SEUIL_SPLIT_SIGNAL:
manchester_coded_binaries.append(current_bin)
manchester_coded_binaries.append(current_bin)
elif current_len > SEUIL_SPLIT_SIGNAL:
list_frames.append(manchester_coded_binaries)
manchester_coded_binaries = []
current_len = 0
current_bin = 1 - current_bin
list_frames.append(manchester_coded_binaries)
for i in range(len(list_frames)):
manchester_coded_binaries = list_frames[i]
try:
if manchester_coded_binaries[0] == manchester_coded_binaries[1]:
if manchester_coded_binaries[0] == 0:
manchester_coded_binaries.pop(0)
else:
manchester_coded_binaries.insert(0, 0)
except:
print("error: " + str(manchester_coded_binaries))
print('coded bin: ' + str(manchester_coded_binaries).replace('[', '').replace(']','').replace(' ','').replace(',',''))
decoded_bin = []
previous_value = None
for i in range(len(manchester_coded_binaries)):
current_value = manchester_coded_binaries[i]
if i%2 == 0:
previous_value = manchester_coded_binaries[i]
else:
if previous_value == 1 and current_value == 0:
decoded_bin.append(1)
elif previous_value == 0 and current_value == 1:
decoded_bin.append(0)
else:
print("VALUE ERROR")
print('decoded bin: ' + str(decoded_bin))
affichage = ''
for i in range(len(decoded_bin)):
affichage += str(decoded_bin[i])
if (i + 1) % 8 == 0 and i != 0:
affichage += '\n'
print('decoded manchester octets:\n' + affichage)
wv.close()
def old_main():
l = []
for _ in xrange(nf):
frame = wv.readframes(1)
v = struct.unpack("<"+fmt, frame[:sw])[0]
l.append(v)
avg = sum(l) / len(l)
print('l: ' + str(l))
print('len(l): ' + str(len(l)))
print('avg: ' + str(avg))
wv.rewind()
l = []
for _ in xrange(nf):
frame = wv.readframes(1)
for i in xrange(1):
v = struct.unpack("<"+fmt, frame[i*sw:(i+1)*sw])[0]
if v > avg:
l.append('l') # court
else:
l.append('c') # long
print('l: ' + str(l))
print('len(l): ' + str(len(l)))
# Group values
l = [(v, len(list(k))) for (v,k) in groupby(l)]
print('l: ' + str(l))
print('len(l): ' + str(len(l)))
# Cut packet and keep only high value
i = 0
paquets = [[]]
for (c, v) in l:
if v > SEUIL_PAUSE:
i += 1
paquets.append([])
else:
if c == 'l':
paquets[i].append(v)
print('paquets: ' + str(paquets))
print('len(paquets): ' + str(len(paquets)))
# Remove empty paquets
paquets = [p for p in paquets if p != []]
print('paquets: ' + str(paquets))
print('len(paquets): ' + str(len(paquets)))
# Compute values
for p in paquets:
decoded_values = [0 if v<SEUIL_LONG_COURT else 1 for v in p]
print decoded_values
wv.close()
|
- decode_differential_manchester.py : idem mais en Manchester Differential, la partie qui change est la suivante, dans la boucle de décodage :
decode_differential_manchester.py |
decoded_bin = []
previous_value = None
for i in range(len(manchester_coded_binaries)):
current_value = manchester_coded_binaries[i]
if i%2 == 0:
previous_value = manchester_coded_binaries[i]
else:
if (previous_value == 1 and current_value == 0) or (previous_value == 0 and current_value == 1):
decoded_bin.append(1)
elif (previous_value == 1 and current_value == 1) or (previous_value == 0 and current_value == 0):
decoded_bin.append(0)
else:
print("VALUE ERROR")
|
- auto_generate_octets.py : pour simplifier la création des fichiers txt contenant les octets seulement, en automatisant l'exécution du script pour décoder les signaux exploitables.
auto_generate_octets.py |
import os
GLOBAL_PATH = os.path.join('..', 'mesures20janvRec')
if __name__ == '__main__':
FILES_TO_ANALYSE = os.listdir(GLOBAL_PATH)
print(FILES_TO_ANALYSE)
for file_wav in FILES_TO_ANALYSE:
new_file = 'P'
numero = file_wav.split('P')[1].split('_')[0]
if int(numero) < 10:
new_file += '0'
new_file += numero + 'res' + file_wav.split('P')[0] + '.txt'
command = 'python ../script/decoder_differential_machester.py ../mesures20janvRec/' + file_wav + ' > ../Pairs_script/octets/' + new_file
print(command)
os.system(command)
|
- move_octets.py : pour simplifier un décalage de bit pour essayer différentes combinaisons d'octets lors de la phase de compréhension des messages.
move_octets.py |
if __name__ == '__main__':
for file_mod in FILES_TO_ANALYSE:
with open(file_mod, 'r') as file_to_read:
data = file_to_read.read().replace('\n','').replace('0', '', 1) + '0'
i = 0
new_data = ''
for i in range(len(data)):
if i != 0 and i % 8 == 0:
new_data += '\n'
new_data += data[i]
new_file_name = os.path.dirname(file_mod) + '\\' + os.path.basename(file_mod).replace('.txt', '_other.txt')
with open(new_file_name, 'w') as file_to_write:
file_to_write.write(new_data)
|
- bruteforce.py : Une des versions pour essayer de retrouver des nombres / digit en ascii ou pas, à base de xor.
bruteforce.py |
import operator
NUMBER_OF_BIT_MAX = 16
list_keys = []
list_octets = []
list_decoded_octets = []
# LIST_VALUES_TO_COMPARE = ['278', '153', '80', '6'] #octets xor brute force 1
LIST_VALUES_TO_COMPARE = ['290', '158', '92', '35', '27'] #octets xor brute force 2
# LIST_VALUES_TO_COMPARE = ['291', '159', '94', '33', '28'] #octets xor brute force 3
# LIST_VALUES_TO_COMPARE = ['291', '161', '92', '30', '27'] #octets xor brute force 4
# LIST_VALUES_TO_COMPARE = ['292', '158', '91', '25', '27'] #octets xor brute force 5
# LIST_VALUES_TO_COMPARE = ['292', '159', '87', '22', '24'] #octets xor brute force 6
# LIST_VALUES_TO_COMPARE = ['293', '159', '89', '15', '21'] #octets xor brute force 7
# LIST_VALUES_TO_COMPARE = ['293', '160', '88', '11', '17'] #octets xor brute force 8
# LIST_VALUES_TO_COMPARE = ['1', '2', '3', '4', '5', '6', '7', '0'] # test
FILE_OCTETS = "C:\\\\Users\\pierre.lepailleur\\Documents\\Ensimag\\git_reseaux\\reseaux\\DeuxiemeJeuDeMesures\\octets_for_bruteforce_xor\\octets_diff_manch2.txt"
dict_digit_to_seek = {}
def val_in_list(value_seeked, list):
for i in range(len(list)):
if NUMBER_OF_BIT_MAX == 8:
if value_seeked == list[i]:
return True
elif NUMBER_OF_BIT_MAX == 16:
if value_seeked == list[i][:8]:
return True
elif value_seeked == list[i][8:]:
return True
return False
def comparison(list_decoded_octets):
nb_val_exact = 0
for digit, occurences in dict_digit_to_seek.iteritems():
nb_temp = 0
seek_val = format(ord(digit), '#010b').replace('0b', '')
while val_in_list(seek_val, list_decoded_octets):
for o in list_decoded_octets:
if NUMBER_OF_BIT_MAX == 16:
if seek_val == o[:8]:
try:
list_decoded_octets.append(o[8:])
except:
pass
list_decoded_octets.remove(o)
elif seek_val == o[8:]:
try:
list_decoded_octets.append(o[:8])
except:
pass
list_decoded_octets.remove(o)
elif NUMBER_OF_BIT_MAX == 8:
if seek_val in o:
list_decoded_octets.append(o.replace(seek_val, 'N', 1))
list_decoded_octets.remove(o)
nb_temp += 1
if occurences == nb_temp:
nb_val_exact += 1
return nb_val_exact
def parse_digit():
for val in LIST_VALUES_TO_COMPARE:
for ch in val:
if ch in dict_digit_to_seek.iterkeys():
dict_digit_to_seek[ch] += 1
else:
dict_digit_to_seek[ch] = 1
print LIST_VALUES_TO_COMPARE
print dict_digit_to_seek
def xor_to_str(octet_from_file, key):
if NUMBER_OF_BIT_MAX == 16:
return str(format(octet_from_file ^ key, '#018b'))
elif NUMBER_OF_BIT_MAX == 8:
return str(format(octet_from_file ^ key, '#010b'))
def extract_octets_from_file(file_octet):
global list_octets
line = None
with open(file_octet, 'r') as file_to_read:
while line != "":
line = file_to_read.readline()
if line == '':
continue
octet = line.replace('\n', '')
octet = int(octet, 2)
list_octets.append(octet)
if NUMBER_OF_BIT_MAX == 16:
new_list_octets = []
ind = 0
o_temp = ''
for octet in list_octets:
if ind % 2 == 1:
o_temp += octet * 2**8
new_list_octets.append(o_temp)
o_temp = ''
else:
o_temp = octet
ind += 1
list_octets = new_list_octets
def top_results(results):
res = ""
if (len(results) > 0):
cle = max(results.iteritems(), key=operator.itemgetter(1))[0]
val = results[cle]
for key, value in results.iteritems():
if value == val:
res += "cle : "
res += (str(bin(key)))
res += " ; valeur : "
res += (str(val))
res += "\n"
return (res)
def exact_results(results):
seek_res = 0
for i in dict_digit_to_seek.itervalues():
seek_res += i
print("NB DIGIT: " + str(seek_res))
for key, value in results.iteritems():
if value == seek_res:
print("cle : " + str(key) + " value : " + str(value))
def main():
parse_digit()
dict_results = {}
list_keys = [i for i in range(2 ** NUMBER_OF_BIT_MAX)]
for key in list_keys:
list_decoded_octets = []
for octet in list_octets:
decoded_octets = xor_to_str(octet, key)
list_decoded_octets.append(decoded_octets.replace(' ', '').replace('0b', ''))
string_decoded = ""
for octet_d in list_decoded_octets:
string_decoded += str(octet_d).replace('0b', '')
dict_results[key] = comparison(list_decoded_octets)
list_key_to_delete = []
for keydict, valdict in dict_results.iteritems():
if valdict == 0:
list_key_to_delete.append(keydict)
for k_delete in list_key_to_delete:
del dict_results[k_delete]
print("TOP RESULTS : " + top_results(dict_results))
if __name__ == '__main__':
extract_octets_from_file(FILE_OCTETS)
main()
|
- analyse_octets.py : Une des versions utilisées pour analyser et comparer les octets récupérés après la chaine de traitement.
analyse_octets.py |
import os
FILE_ALL_OCTETS = "C:\\\\Users\\pierre.lepailleur\\Documents\\Ensimag\\git_reseaux\\reseaux\\TroisiemeJeuDeMesures\\all_octets_diff_manch.txt"
GLOBAL_PATH = "C:\\\\Users\\pierre.lepailleur\\Documents\\Ensimag\\git_reseaux\\reseaux\\TroisiemeJeuDeMesures\\octets\\"
FILES_TO_ANALYSE = []
dict_octets = {}
def build_octets_dict():
global dict_octets
line = None
with open(FILE_ALL_OCTETS, 'r') as file_to_read:
while line != "":
line = file_to_read.readline()
if line == '':
continue
octet = line.replace('\n', '')
if not octet in dict_octets:
dict_octets[octet] = str(len(dict_octets))
try:
del dict_octets['']
except:
pass
print(dict_octets)
def parse_octets(file_oct):
line = None
res = ''
with open(file_oct, 'r') as file_to_read:
c = 0
while line != "":
line = file_to_read.readline()
if '0' in line or '1' in line:
c += 1
if line == '' or c == 21:
break
try:
key = line.replace('\n', '').replace(' ', '')
res += dict_octets[key].replace('0x', '').zfill(2) + ' '
except:
pass
return res
def dict_to_hex():
for key, val in dict_octets.iteritems():
try:
dict_octets[key] = hex(int(key, 2))
except:
pass
print dict_octets
def clear_res(results):
clear_result = ''
line_top = None
for line in results.split('\n'):
if line_top == None:
line_top = line
clear_result += line
clear_result += '\n'
continue
else:
line_current = line.split(' ')[0].split(' ')
line_top = line_top.split(' ')[0].split(' ')
for i in range(len(line_current)):
try:
if line_current[i] == line_top[i]:
clear_result += '__ '
else:
clear_result += line_current[i] + ' '
except:
print("error with: current: " + str(line_current) + ' top: ' + str(line_top))
line_top = line
try:
clear_result += ' ' + str(line.split(' ')[1]) + '\n'
except:
pass
clear_result += '\n'
return clear_result
if __name__ == '__main__':
build_octets_dict()
dict_to_hex()
global_result = ''
FILES_TO_ANALYSE = os.listdir(GLOBAL_PATH)
for i in range(len(FILES_TO_ANALYSE)):
file_t = GLOBAL_PATH + FILES_TO_ANALYSE[i]
result = parse_octets(file_t)
result += ' file: ' + str(FILES_TO_ANALYSE[i])
print(result)
global_result += result + '\n'
print('\n\n' + clear_res(global_result))
|
Les archives des trois différents jeux de mesures réalisés, contenant les originaux, les signaux pré-traités, traités et les résultats d'analyse étant volumineux, vous pouvez nous contacter afin de les récupérer ou nous poser des questions si vous désirez poursuivre ce travail :
pierre.le-pailleur@grenoble-inp.fr
marie-alix.juffard@grenoble-inp.fr
Conclusion
Nous n'avons pas réussi à déchiffrer les signaux enregistrés dans le temps imparti. Cependant, nous avons exploité de nombreuses pistes et le travail est bien avancé. Les étudiants qui souhaiteraient reprendre ce projet pour le mener à terme peuvent nous contacter par mail, nous répondrons avec plaisir à leurs questions.