Coverage for nova/console/rfb/authvencrypt.py: 96%
67 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-24 11:16 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-24 11:16 +0000
1# Copyright (c) 2014-2016 Red Hat, Inc
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may
4# not use this file except in compliance with the License. You may obtain
5# a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations
13# under the License.
15import enum
16import ssl
17import struct
19from oslo_config import cfg
20from oslo_log import log as logging
22from nova.console.rfb import auth
23from nova import exception
24from nova.i18n import _
26LOG = logging.getLogger(__name__)
27CONF = cfg.CONF
30class AuthVeNCryptSubtype(enum.IntEnum):
31 """Possible VeNCrypt subtypes.
33 From https://github.com/rfbproto/rfbproto/blob/master/rfbproto.rst
34 """
36 PLAIN = 256
37 TLSNONE = 257
38 TLSVNC = 258
39 TLSPLAIN = 259
40 X509NONE = 260
41 X509VNC = 261
42 X509PLAIN = 262
43 X509SASL = 263
44 TLSSASL = 264
47class RFBAuthSchemeVeNCrypt(auth.RFBAuthScheme):
48 """A security proxy helper which uses VeNCrypt.
50 This security proxy helper uses the VeNCrypt security
51 type to achieve SSL/TLS-secured VNC. It supports both
52 standard SSL/TLS encryption and SSL/TLS encryption with
53 x509 authentication.
55 Refer to https://www.berrange.com/~dan/vencrypt.txt for
56 a brief overview of the protocol.
57 """
59 def security_type(self):
60 return auth.AuthType.VENCRYPT
62 def security_handshake(self, compute_sock):
63 def recv(num):
64 b = compute_sock.recv(num)
65 if len(b) != num: 65 ↛ 66line 65 didn't jump to line 66 because the condition on line 65 was never true
66 reason = _("Short read from compute socket, wanted "
67 "%(wanted)d bytes but got %(got)d") % {
68 'wanted': num, 'got': len(b)}
69 raise exception.RFBAuthHandshakeFailed(reason=reason)
70 return b
72 # get the VeNCrypt version from the server
73 maj_ver = ord(recv(1))
74 min_ver = ord(recv(1))
76 LOG.debug("Server sent VeNCrypt version "
77 "%(maj)s.%(min)s", {'maj': maj_ver, 'min': min_ver})
79 if maj_ver != 0 or min_ver != 2:
80 reason = _("Only VeNCrypt version 0.2 is supported by this "
81 "proxy, but the server wanted to use version "
82 "%(maj)s.%(min)s") % {'maj': maj_ver, 'min': min_ver}
83 raise exception.RFBAuthHandshakeFailed(reason=reason)
85 # use version 0.2
86 compute_sock.sendall(b"\x00\x02")
88 can_use_version = ord(recv(1))
90 if can_use_version > 0:
91 reason = _("Server could not use VeNCrypt version 0.2")
92 raise exception.RFBAuthHandshakeFailed(reason=reason)
94 # get the supported auth subtypes
95 sub_types_cnt = ord(recv(1))
96 sub_types_raw = recv(sub_types_cnt * auth.SUBTYPE_LENGTH)
97 sub_types = struct.unpack('!' + str(sub_types_cnt) + 'I',
98 sub_types_raw)
100 LOG.debug(
101 "Server supports VeNCrypt subtypes: %s",
102 ', '.join(
103 '%d (%s)' % (
104 AuthVeNCryptSubtype(t).value, AuthVeNCryptSubtype(t).name,
105 ) for t in sub_types
106 ))
108 # We use X509None as we're only seeking to encrypt the channel (ruling
109 # out PLAIN) and prevent MITM (ruling out TLS*, which uses trivially
110 # MITM'd Anonymous Diffie Hellmann (DH) cyphers)
111 if AuthVeNCryptSubtype.X509NONE not in sub_types:
112 reason = _(
113 "Server does not support the {value} ({name}) "
114 "VeNCrypt auth subtype"
115 ).format(value=AuthVeNCryptSubtype.X509NONE.value,
116 name=AuthVeNCryptSubtype.X509NONE.name)
117 raise exception.RFBAuthHandshakeFailed(reason=reason)
119 LOG.debug(
120 "Attempting to use the %d (%s) VeNCrypt auth subtype",
121 AuthVeNCryptSubtype.X509NONE.value,
122 AuthVeNCryptSubtype.X509NONE.name)
124 compute_sock.sendall(struct.pack(
125 '!I', AuthVeNCryptSubtype.X509NONE))
127 # NB(sross): the spec is missing a U8 here that's used in
128 # multiple implementations (e.g. QEMU, GTK-VNC). 1 means
129 # acceptance, 0 means failure (unlike the rest of RFB)
130 auth_accepted = ord(recv(1))
131 if auth_accepted == 0:
132 reason = _(
133 "Server didn't accept the requested VeNCrypt auth subtype")
134 raise exception.RFBAuthHandshakeFailed(reason=reason)
136 LOG.debug("Server accepted the requested VeNCrypt auth subtype")
138 if CONF.vnc.vencrypt_client_key and CONF.vnc.vencrypt_client_cert:
139 client_key = CONF.vnc.vencrypt_client_key
140 client_cert = CONF.vnc.vencrypt_client_cert
141 else:
142 client_key = None
143 client_cert = None
145 try:
146 wrapped_sock = ssl.wrap_socket(
147 compute_sock,
148 keyfile=client_key,
149 certfile=client_cert,
150 server_side=False,
151 cert_reqs=ssl.CERT_REQUIRED,
152 ca_certs=CONF.vnc.vencrypt_ca_certs)
154 LOG.info("VeNCrypt security handshake accepted")
155 return wrapped_sock
157 except ssl.SSLError as e:
158 reason = _("Error establishing TLS connection to server: %s")
159 raise exception.RFBAuthHandshakeFailed(reason=reason % str(e))