Coverage for nova/console/rfb/authvencrypt.py: 96%

67 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-04-24 11:16 +0000

1# Copyright (c) 2014-2016 Red Hat, Inc 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); you may 

4# not use this file except in compliance with the License. You may obtain 

5# a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

12# License for the specific language governing permissions and limitations 

13# under the License. 

14 

15import enum 

16import ssl 

17import struct 

18 

19from oslo_config import cfg 

20from oslo_log import log as logging 

21 

22from nova.console.rfb import auth 

23from nova import exception 

24from nova.i18n import _ 

25 

26LOG = logging.getLogger(__name__) 

27CONF = cfg.CONF 

28 

29 

30class AuthVeNCryptSubtype(enum.IntEnum): 

31 """Possible VeNCrypt subtypes. 

32 

33 From https://github.com/rfbproto/rfbproto/blob/master/rfbproto.rst 

34 """ 

35 

36 PLAIN = 256 

37 TLSNONE = 257 

38 TLSVNC = 258 

39 TLSPLAIN = 259 

40 X509NONE = 260 

41 X509VNC = 261 

42 X509PLAIN = 262 

43 X509SASL = 263 

44 TLSSASL = 264 

45 

46 

47class RFBAuthSchemeVeNCrypt(auth.RFBAuthScheme): 

48 """A security proxy helper which uses VeNCrypt. 

49 

50 This security proxy helper uses the VeNCrypt security 

51 type to achieve SSL/TLS-secured VNC. It supports both 

52 standard SSL/TLS encryption and SSL/TLS encryption with 

53 x509 authentication. 

54 

55 Refer to https://www.berrange.com/~dan/vencrypt.txt for 

56 a brief overview of the protocol. 

57 """ 

58 

59 def security_type(self): 

60 return auth.AuthType.VENCRYPT 

61 

62 def security_handshake(self, compute_sock): 

63 def recv(num): 

64 b = compute_sock.recv(num) 

65 if len(b) != num: 65 ↛ 66line 65 didn't jump to line 66 because the condition on line 65 was never true

66 reason = _("Short read from compute socket, wanted " 

67 "%(wanted)d bytes but got %(got)d") % { 

68 'wanted': num, 'got': len(b)} 

69 raise exception.RFBAuthHandshakeFailed(reason=reason) 

70 return b 

71 

72 # get the VeNCrypt version from the server 

73 maj_ver = ord(recv(1)) 

74 min_ver = ord(recv(1)) 

75 

76 LOG.debug("Server sent VeNCrypt version " 

77 "%(maj)s.%(min)s", {'maj': maj_ver, 'min': min_ver}) 

78 

79 if maj_ver != 0 or min_ver != 2: 

80 reason = _("Only VeNCrypt version 0.2 is supported by this " 

81 "proxy, but the server wanted to use version " 

82 "%(maj)s.%(min)s") % {'maj': maj_ver, 'min': min_ver} 

83 raise exception.RFBAuthHandshakeFailed(reason=reason) 

84 

85 # use version 0.2 

86 compute_sock.sendall(b"\x00\x02") 

87 

88 can_use_version = ord(recv(1)) 

89 

90 if can_use_version > 0: 

91 reason = _("Server could not use VeNCrypt version 0.2") 

92 raise exception.RFBAuthHandshakeFailed(reason=reason) 

93 

94 # get the supported auth subtypes 

95 sub_types_cnt = ord(recv(1)) 

96 sub_types_raw = recv(sub_types_cnt * auth.SUBTYPE_LENGTH) 

97 sub_types = struct.unpack('!' + str(sub_types_cnt) + 'I', 

98 sub_types_raw) 

99 

100 LOG.debug( 

101 "Server supports VeNCrypt subtypes: %s", 

102 ', '.join( 

103 '%d (%s)' % ( 

104 AuthVeNCryptSubtype(t).value, AuthVeNCryptSubtype(t).name, 

105 ) for t in sub_types 

106 )) 

107 

108 # We use X509None as we're only seeking to encrypt the channel (ruling 

109 # out PLAIN) and prevent MITM (ruling out TLS*, which uses trivially 

110 # MITM'd Anonymous Diffie Hellmann (DH) cyphers) 

111 if AuthVeNCryptSubtype.X509NONE not in sub_types: 

112 reason = _( 

113 "Server does not support the {value} ({name}) " 

114 "VeNCrypt auth subtype" 

115 ).format(value=AuthVeNCryptSubtype.X509NONE.value, 

116 name=AuthVeNCryptSubtype.X509NONE.name) 

117 raise exception.RFBAuthHandshakeFailed(reason=reason) 

118 

119 LOG.debug( 

120 "Attempting to use the %d (%s) VeNCrypt auth subtype", 

121 AuthVeNCryptSubtype.X509NONE.value, 

122 AuthVeNCryptSubtype.X509NONE.name) 

123 

124 compute_sock.sendall(struct.pack( 

125 '!I', AuthVeNCryptSubtype.X509NONE)) 

126 

127 # NB(sross): the spec is missing a U8 here that's used in 

128 # multiple implementations (e.g. QEMU, GTK-VNC). 1 means 

129 # acceptance, 0 means failure (unlike the rest of RFB) 

130 auth_accepted = ord(recv(1)) 

131 if auth_accepted == 0: 

132 reason = _( 

133 "Server didn't accept the requested VeNCrypt auth subtype") 

134 raise exception.RFBAuthHandshakeFailed(reason=reason) 

135 

136 LOG.debug("Server accepted the requested VeNCrypt auth subtype") 

137 

138 if CONF.vnc.vencrypt_client_key and CONF.vnc.vencrypt_client_cert: 

139 client_key = CONF.vnc.vencrypt_client_key 

140 client_cert = CONF.vnc.vencrypt_client_cert 

141 else: 

142 client_key = None 

143 client_cert = None 

144 

145 try: 

146 wrapped_sock = ssl.wrap_socket( 

147 compute_sock, 

148 keyfile=client_key, 

149 certfile=client_cert, 

150 server_side=False, 

151 cert_reqs=ssl.CERT_REQUIRED, 

152 ca_certs=CONF.vnc.vencrypt_ca_certs) 

153 

154 LOG.info("VeNCrypt security handshake accepted") 

155 return wrapped_sock 

156 

157 except ssl.SSLError as e: 

158 reason = _("Error establishing TLS connection to server: %s") 

159 raise exception.RFBAuthHandshakeFailed(reason=reason % str(e))